From 08aa4418c30cfc18ccc69a0f0f9cb9e17be6c196 Mon Sep 17 00:00:00 2001 From: Pierre Schmitz Date: Mon, 12 Aug 2013 09:28:15 +0200 Subject: Update to MediaWiki 1.21.1 --- includes/job/DoubleRedirectJob.php | 207 -------- includes/job/EmaillingJob.php | 46 -- includes/job/EnotifNotifyJob.php | 57 -- includes/job/Job.php | 416 +++++---------- includes/job/JobQueue.php | 435 ++++++++++++++++ includes/job/JobQueueAggregator.php | 139 +++++ includes/job/JobQueueAggregatorMemc.php | 117 +++++ includes/job/JobQueueAggregatorRedis.php | 165 ++++++ includes/job/JobQueueDB.php | 716 ++++++++++++++++++++++++++ includes/job/JobQueueGroup.php | 351 +++++++++++++ includes/job/README | 81 +++ includes/job/RefreshLinksJob.php | 202 -------- includes/job/UploadFromUrlJob.php | 179 ------- includes/job/jobs/AssembleUploadChunksJob.php | 118 +++++ includes/job/jobs/DoubleRedirectJob.php | 218 ++++++++ includes/job/jobs/DuplicateJob.php | 59 +++ includes/job/jobs/EmaillingJob.php | 47 ++ includes/job/jobs/EnotifNotifyJob.php | 58 +++ includes/job/jobs/HTMLCacheUpdateJob.php | 254 +++++++++ includes/job/jobs/NullJob.php | 60 +++ includes/job/jobs/PublishStashedFileJob.php | 130 +++++ includes/job/jobs/RefreshLinksJob.php | 226 ++++++++ includes/job/jobs/UploadFromUrlJob.php | 179 +++++++ 23 files changed, 3482 insertions(+), 978 deletions(-) delete mode 100644 includes/job/DoubleRedirectJob.php delete mode 100644 includes/job/EmaillingJob.php delete mode 100644 includes/job/EnotifNotifyJob.php create mode 100644 includes/job/JobQueue.php create mode 100644 includes/job/JobQueueAggregator.php create mode 100644 includes/job/JobQueueAggregatorMemc.php create mode 100644 includes/job/JobQueueAggregatorRedis.php create mode 100644 includes/job/JobQueueDB.php create mode 100644 includes/job/JobQueueGroup.php create mode 100644 includes/job/README delete mode 100644 includes/job/RefreshLinksJob.php delete mode 100644 includes/job/UploadFromUrlJob.php create mode 100644 includes/job/jobs/AssembleUploadChunksJob.php create mode 100644 includes/job/jobs/DoubleRedirectJob.php create mode 100644 includes/job/jobs/DuplicateJob.php create mode 100644 includes/job/jobs/EmaillingJob.php create mode 100644 includes/job/jobs/EnotifNotifyJob.php create mode 100644 includes/job/jobs/HTMLCacheUpdateJob.php create mode 100644 includes/job/jobs/NullJob.php create mode 100644 includes/job/jobs/PublishStashedFileJob.php create mode 100644 includes/job/jobs/RefreshLinksJob.php create mode 100644 includes/job/jobs/UploadFromUrlJob.php (limited to 'includes/job') diff --git a/includes/job/DoubleRedirectJob.php b/includes/job/DoubleRedirectJob.php deleted file mode 100644 index 08af9975..00000000 --- a/includes/job/DoubleRedirectJob.php +++ /dev/null @@ -1,207 +0,0 @@ -" - * @param $redirTitle Title: the title which has changed, redirects pointing to this title are fixed - * @param $destTitle bool Not used - */ - public static function fixRedirects( $reason, $redirTitle, $destTitle = false ) { - # Need to use the master to get the redirect table updated in the same transaction - $dbw = wfGetDB( DB_MASTER ); - $res = $dbw->select( - array( 'redirect', 'page' ), - array( 'page_namespace', 'page_title' ), - array( - 'page_id = rd_from', - 'rd_namespace' => $redirTitle->getNamespace(), - 'rd_title' => $redirTitle->getDBkey() - ), __METHOD__ ); - if ( !$res->numRows() ) { - return; - } - $jobs = array(); - foreach ( $res as $row ) { - $title = Title::makeTitle( $row->page_namespace, $row->page_title ); - if ( !$title ) { - continue; - } - - $jobs[] = new self( $title, array( - 'reason' => $reason, - 'redirTitle' => $redirTitle->getPrefixedDBkey() ) ); - # Avoid excessive memory usage - if ( count( $jobs ) > 10000 ) { - Job::batchInsert( $jobs ); - $jobs = array(); - } - } - Job::batchInsert( $jobs ); - } - - function __construct( $title, $params = false, $id = 0 ) { - parent::__construct( 'fixDoubleRedirect', $title, $params, $id ); - $this->reason = $params['reason']; - $this->redirTitle = Title::newFromText( $params['redirTitle'] ); - $this->destTitleText = !empty( $params['destTitle'] ) ? $params['destTitle'] : ''; - } - - /** - * @return bool - */ - function run() { - if ( !$this->redirTitle ) { - $this->setLastError( 'Invalid title' ); - return false; - } - - $targetRev = Revision::newFromTitle( $this->title, false, Revision::READ_LATEST ); - if ( !$targetRev ) { - wfDebug( __METHOD__.": target redirect already deleted, ignoring\n" ); - return true; - } - $text = $targetRev->getText(); - $currentDest = Title::newFromRedirect( $text ); - if ( !$currentDest || !$currentDest->equals( $this->redirTitle ) ) { - wfDebug( __METHOD__.": Redirect has changed since the job was queued\n" ); - return true; - } - - # Check for a suppression tag (used e.g. in periodically archived discussions) - $mw = MagicWord::get( 'staticredirect' ); - if ( $mw->match( $text ) ) { - wfDebug( __METHOD__.": skipping: suppressed with __STATICREDIRECT__\n" ); - return true; - } - - # Find the current final destination - $newTitle = self::getFinalDestination( $this->redirTitle ); - if ( !$newTitle ) { - wfDebug( __METHOD__.": skipping: single redirect, circular redirect or invalid redirect destination\n" ); - return true; - } - if ( $newTitle->equals( $this->redirTitle ) ) { - # The redirect is already right, no need to change it - # This can happen if the page was moved back (say after vandalism) - wfDebug( __METHOD__.": skipping, already good\n" ); - } - - # Preserve fragment (bug 14904) - $newTitle = Title::makeTitle( $newTitle->getNamespace(), $newTitle->getDBkey(), - $currentDest->getFragment() ); - - # Fix the text - # Remember that redirect pages can have categories, templates, etc., - # so the regex has to be fairly general - $newText = preg_replace( '/ \[ \[ [^\]]* \] \] /x', - '[[' . $newTitle->getFullText() . ']]', - $text, 1 ); - - if ( $newText === $text ) { - $this->setLastError( 'Text unchanged???' ); - return false; - } - - # Save it - global $wgUser; - $oldUser = $wgUser; - $wgUser = $this->getUser(); - $article = WikiPage::factory( $this->title ); - $reason = wfMessage( 'double-redirect-fixed-' . $this->reason, - $this->redirTitle->getPrefixedText(), $newTitle->getPrefixedText() - )->inContentLanguage()->text(); - $article->doEdit( $newText, $reason, EDIT_UPDATE | EDIT_SUPPRESS_RC, false, $this->getUser() ); - $wgUser = $oldUser; - - return true; - } - - /** - * Get the final destination of a redirect - * - * @param $title Title - * - * @return bool if the specified title is not a redirect, or if it is a circular redirect - */ - public static function getFinalDestination( $title ) { - $dbw = wfGetDB( DB_MASTER ); - - $seenTitles = array(); # Circular redirect check - $dest = false; - - while ( true ) { - $titleText = $title->getPrefixedDBkey(); - if ( isset( $seenTitles[$titleText] ) ) { - wfDebug( __METHOD__, "Circular redirect detected, aborting\n" ); - return false; - } - $seenTitles[$titleText] = true; - - $row = $dbw->selectRow( - array( 'redirect', 'page' ), - array( 'rd_namespace', 'rd_title' ), - array( - 'rd_from=page_id', - 'page_namespace' => $title->getNamespace(), - 'page_title' => $title->getDBkey() - ), __METHOD__ ); - if ( !$row ) { - # No redirect from here, chain terminates - break; - } else { - $dest = $title = Title::makeTitle( $row->rd_namespace, $row->rd_title ); - } - } - return $dest; - } - - /** - * Get a user object for doing edits, from a request-lifetime cache - * @return User - */ - function getUser() { - if ( !self::$user ) { - self::$user = User::newFromName( wfMessage( 'double-redirect-fixer' )->inContentLanguage()->text(), false ); - # FIXME: newFromName could return false on a badly configured wiki. - if ( !self::$user->isLoggedIn() ) { - self::$user->addToDatabase(); - } - } - return self::$user; - } -} - diff --git a/includes/job/EmaillingJob.php b/includes/job/EmaillingJob.php deleted file mode 100644 index d3599882..00000000 --- a/includes/job/EmaillingJob.php +++ /dev/null @@ -1,46 +0,0 @@ -params['to'], - $this->params['from'], - $this->params['subj'], - $this->params['body'], - $this->params['replyto'] - ); - return true; - } - -} diff --git a/includes/job/EnotifNotifyJob.php b/includes/job/EnotifNotifyJob.php deleted file mode 100644 index b4c925e9..00000000 --- a/includes/job/EnotifNotifyJob.php +++ /dev/null @@ -1,57 +0,0 @@ -params['editorID'] ) && $this->params['editorID'] ) { - $editor = User::newFromId( $this->params['editorID'] ); - // B/C, only the name might be given. - } else { - # FIXME: newFromName could return false on a badly configured wiki. - $editor = User::newFromName( $this->params['editor'], false ); - } - $enotif->actuallyNotifyOnPageChange( - $editor, - $this->title, - $this->params['timestamp'], - $this->params['summary'], - $this->params['minorEdit'], - $this->params['oldid'], - $this->params['watchers'] - ); - return true; - } - -} diff --git a/includes/job/Job.php b/includes/job/Job.php index d777a5d4..bcf582e7 100644 --- a/includes/job/Job.php +++ b/includes/job/Job.php @@ -23,11 +23,11 @@ /** * Class to both describe a background job and handle jobs. + * The queue aspects of this class are now deprecated. * * @ingroup JobQueue */ abstract class Job { - /** * @var Title */ @@ -39,6 +39,9 @@ abstract class Job { $removeDuplicates, $error; + /** @var Array Additional queue metadata */ + public $metadata = array(); + /*------------------------------------------------------------------------- * Abstract functions *------------------------------------------------------------------------*/ @@ -47,176 +50,23 @@ abstract class Job { * Run the job * @return boolean success */ - abstract function run(); + abstract public function run(); /*------------------------------------------------------------------------- * Static functions *------------------------------------------------------------------------*/ - /** - * Pop a job of a certain type. This tries less hard than pop() to - * actually find a job; it may be adversely affected by concurrent job - * runners. - * - * @param $type string - * - * @return Job - */ - static function pop_type( $type ) { - wfProfilein( __METHOD__ ); - - $dbw = wfGetDB( DB_MASTER ); - - $dbw->begin( __METHOD__ ); - - $row = $dbw->selectRow( - 'job', - '*', - array( 'job_cmd' => $type ), - __METHOD__, - array( 'LIMIT' => 1, 'FOR UPDATE' ) - ); - - if ( $row === false ) { - $dbw->commit( __METHOD__ ); - wfProfileOut( __METHOD__ ); - return false; - } - - /* Ensure we "own" this row */ - $dbw->delete( 'job', array( 'job_id' => $row->job_id ), __METHOD__ ); - $affected = $dbw->affectedRows(); - $dbw->commit( __METHOD__ ); - - if ( $affected == 0 ) { - wfProfileOut( __METHOD__ ); - return false; - } - - wfIncrStats( 'job-pop' ); - $namespace = $row->job_namespace; - $dbkey = $row->job_title; - $title = Title::makeTitleSafe( $namespace, $dbkey ); - $job = Job::factory( $row->job_cmd, $title, Job::extractBlob( $row->job_params ), - $row->job_id ); - - $job->removeDuplicates(); - - wfProfileOut( __METHOD__ ); - return $job; - } - - /** - * Pop a job off the front of the queue - * - * @param $offset Integer: Number of jobs to skip - * @return Job or false if there's no jobs - */ - static function pop( $offset = 0 ) { - wfProfileIn( __METHOD__ ); - - $dbr = wfGetDB( DB_SLAVE ); - - /* Get a job from the slave, start with an offset, - scan full set afterwards, avoid hitting purged rows - - NB: If random fetch previously was used, offset - will always be ahead of few entries - */ - - $conditions = self::defaultQueueConditions(); - - $offset = intval( $offset ); - $options = array( 'ORDER BY' => 'job_id', 'USE INDEX' => 'PRIMARY' ); - - $row = $dbr->selectRow( 'job', '*', - array_merge( $conditions, array( "job_id >= $offset" ) ), - __METHOD__, - $options - ); - - // Refetching without offset is needed as some of job IDs could have had delayed commits - // and have lower IDs than jobs already executed, blame concurrency :) - // - if ( $row === false ) { - if ( $offset != 0 ) { - $row = $dbr->selectRow( 'job', '*', $conditions, __METHOD__, $options ); - } - - if ( $row === false ) { - wfProfileOut( __METHOD__ ); - return false; - } - } - - // Try to delete it from the master - $dbw = wfGetDB( DB_MASTER ); - $dbw->delete( 'job', array( 'job_id' => $row->job_id ), __METHOD__ ); - $affected = $dbw->affectedRows(); - - if ( !$affected ) { - // Failed, someone else beat us to it - // Try getting a random row - $row = $dbw->selectRow( 'job', array( 'minjob' => 'MIN(job_id)', - 'maxjob' => 'MAX(job_id)' ), '1=1', __METHOD__ ); - if ( $row === false || is_null( $row->minjob ) || is_null( $row->maxjob ) ) { - // No jobs to get - wfProfileOut( __METHOD__ ); - return false; - } - // Get the random row - $row = $dbw->selectRow( 'job', '*', - 'job_id >= ' . mt_rand( $row->minjob, $row->maxjob ), __METHOD__ ); - if ( $row === false ) { - // Random job gone before we got the chance to select it - // Give up - wfProfileOut( __METHOD__ ); - return false; - } - // Delete the random row - $dbw->delete( 'job', array( 'job_id' => $row->job_id ), __METHOD__ ); - $affected = $dbw->affectedRows(); - - if ( !$affected ) { - // Random job gone before we exclusively deleted it - // Give up - wfProfileOut( __METHOD__ ); - return false; - } - } - - // If execution got to here, there's a row in $row that has been deleted from the database - // by this thread. Hence the concurrent pop was successful. - wfIncrStats( 'job-pop' ); - $namespace = $row->job_namespace; - $dbkey = $row->job_title; - $title = Title::makeTitleSafe( $namespace, $dbkey ); - - if ( is_null( $title ) ) { - wfProfileOut( __METHOD__ ); - return false; - } - - $job = Job::factory( $row->job_cmd, $title, Job::extractBlob( $row->job_params ), $row->job_id ); - - // Remove any duplicates it may have later in the queue - $job->removeDuplicates(); - - wfProfileOut( __METHOD__ ); - return $job; - } - /** * Create the appropriate object to handle a specific job * - * @param $command String: Job command + * @param string $command Job command * @param $title Title: Associated title - * @param $params Array|bool: Job parameters - * @param $id Int: Job identifier + * @param array|bool $params Job parameters + * @param int $id Job identifier * @throws MWException * @return Job */ - static function factory( $command, Title $title, $params = false, $id = 0 ) { + public static function factory( $command, Title $title, $params = false, $id = 0 ) { global $wgJobClasses; if( isset( $wgJobClasses[$command] ) ) { $class = $wgJobClasses[$command]; @@ -225,30 +75,6 @@ abstract class Job { throw new MWException( "Invalid job command `{$command}`" ); } - /** - * @param $params - * @return string - */ - static function makeBlob( $params ) { - if ( $params !== false ) { - return serialize( $params ); - } else { - return ''; - } - } - - /** - * @param $blob - * @return bool|mixed - */ - static function extractBlob( $blob ) { - if ( (string)$blob !== '' ) { - return unserialize( $blob ); - } else { - return false; - } - } - /** * Batch-insert a group of jobs into the queue. * This will be wrapped in a transaction with a forced commit. @@ -256,34 +82,12 @@ abstract class Job { * This may add duplicate at insert time, but they will be * removed later on, when the first one is popped. * - * @param $jobs array of Job objects + * @param array $jobs of Job objects + * @return bool + * @deprecated 1.21 */ - static function batchInsert( $jobs ) { - if ( !count( $jobs ) ) { - return; - } - $dbw = wfGetDB( DB_MASTER ); - $rows = array(); - - /** - * @var $job Job - */ - foreach ( $jobs as $job ) { - $rows[] = $job->insertFields(); - if ( count( $rows ) >= 50 ) { - # Do a small transaction to avoid slave lag - $dbw->begin( __METHOD__ ); - $dbw->insert( 'job', $rows, __METHOD__, 'IGNORE' ); - $dbw->commit( __METHOD__ ); - $rows = array(); - } - } - if ( $rows ) { // last chunk - $dbw->begin( __METHOD__ ); - $dbw->insert( 'job', $rows, __METHOD__, 'IGNORE' ); - $dbw->commit( __METHOD__ ); - } - wfIncrStats( 'job-insert', count( $jobs ) ); + public static function batchInsert( $jobs ) { + return JobQueueGroup::singleton()->push( $jobs ); } /** @@ -293,46 +97,36 @@ abstract class Job { * be rolled-back as part of a larger transaction. However, * large batches of jobs can cause slave lag. * - * @param $jobs array of Job objects + * @param array $jobs of Job objects + * @return bool + * @deprecated 1.21 */ - static function safeBatchInsert( $jobs ) { - if ( !count( $jobs ) ) { - return; - } - $dbw = wfGetDB( DB_MASTER ); - $rows = array(); - foreach ( $jobs as $job ) { - $rows[] = $job->insertFields(); - if ( count( $rows ) >= 500 ) { - $dbw->insert( 'job', $rows, __METHOD__, 'IGNORE' ); - $rows = array(); - } - } - if ( $rows ) { // last chunk - $dbw->insert( 'job', $rows, __METHOD__, 'IGNORE' ); - } - wfIncrStats( 'job-insert', count( $jobs ) ); + public static function safeBatchInsert( $jobs ) { + return JobQueueGroup::singleton()->push( $jobs, JobQueue::QoS_Atomic ); } - /** - * SQL conditions to apply on most JobQueue queries + * Pop a job of a certain type. This tries less hard than pop() to + * actually find a job; it may be adversely affected by concurrent job + * runners. * - * Whenever we exclude jobs types from the default queue, we want to make - * sure that queries to the job queue actually ignore them. + * @param $type string + * @return Job|bool Returns false if there are no jobs + * @deprecated 1.21 + */ + public static function pop_type( $type ) { + return JobQueueGroup::singleton()->get( $type )->pop(); + } + + /** + * Pop a job off the front of the queue. + * This is subject to $wgJobTypesExcludedFromDefaultQueue. * - * @return array SQL conditions suitable for Database:: methods + * @return Job or false if there's no jobs + * @deprecated 1.21 */ - static function defaultQueueConditions( ) { - global $wgJobTypesExcludedFromDefaultQueue; - $conditions = array(); - if ( count( $wgJobTypesExcludedFromDefaultQueue ) > 0 ) { - $dbr = wfGetDB( DB_SLAVE ); - foreach ( $wgJobTypesExcludedFromDefaultQueue as $cmdType ) { - $conditions[] = "job_cmd != " . $dbr->addQuotes( $cmdType ); - } - } - return $conditions; + public static function pop() { + return JobQueueGroup::singleton()->pop(); } /*------------------------------------------------------------------------- @@ -345,83 +139,131 @@ abstract class Job { * @param $params array|bool * @param $id int */ - function __construct( $command, $title, $params = false, $id = 0 ) { + public function __construct( $command, $title, $params = false, $id = 0 ) { $this->command = $command; $this->title = $title; $this->params = $params; $this->id = $id; - // A bit of premature generalisation - // Oh well, the whole class is premature generalisation really - $this->removeDuplicates = true; + $this->removeDuplicates = false; // expensive jobs may set this to true } /** - * Insert a single job into the queue. - * @return bool true on success + * @return integer May be 0 for jobs stored outside the DB */ - function insert() { - $fields = $this->insertFields(); + public function getId() { + return $this->id; + } - $dbw = wfGetDB( DB_MASTER ); + /** + * @return string + */ + public function getType() { + return $this->command; + } - if ( $this->removeDuplicates ) { - $res = $dbw->select( 'job', array( '1' ), $fields, __METHOD__ ); - if ( $dbw->numRows( $res ) ) { - return true; - } - } - wfIncrStats( 'job-insert' ); - return $dbw->insert( 'job', $fields, __METHOD__ ); + /** + * @return Title + */ + public function getTitle() { + return $this->title; } /** * @return array */ - protected function insertFields() { - $dbw = wfGetDB( DB_MASTER ); + public function getParams() { + return $this->params; + } + + /** + * @return bool Whether only one of each identical set of jobs should be run + */ + public function ignoreDuplicates() { + return $this->removeDuplicates; + } + + /** + * @return bool Whether this job can be retried on failure by job runners + */ + public function allowRetries() { + return true; + } + + /** + * Subclasses may need to override this to make duplication detection work + * + * @return Array Map of key/values + */ + public function getDeduplicationInfo() { + $info = array( + 'type' => $this->getType(), + 'namespace' => $this->getTitle()->getNamespace(), + 'title' => $this->getTitle()->getDBkey(), + 'params' => $this->getParams() + ); + // Identical jobs with different "root" jobs should count as duplicates + if ( is_array( $info['params'] ) ) { + unset( $info['params']['rootJobSignature'] ); + unset( $info['params']['rootJobTimestamp'] ); + } + return $info; + } + + /** + * @param string $key A key that identifies the task + * @return Array + */ + public static function newRootJobParams( $key ) { return array( - 'job_id' => $dbw->nextSequenceValue( 'job_job_id_seq' ), - 'job_cmd' => $this->command, - 'job_namespace' => $this->title->getNamespace(), - 'job_title' => $this->title->getDBkey(), - 'job_timestamp' => $dbw->timestamp(), - 'job_params' => Job::makeBlob( $this->params ) + 'rootJobSignature' => sha1( $key ), + 'rootJobTimestamp' => wfTimestampNow() ); } /** - * Remove jobs in the job queue which are duplicates of this job. - * This is deadlock-prone and so starts its own transaction. + * @return Array */ - function removeDuplicates() { - if ( !$this->removeDuplicates ) { - return; - } + public function getRootJobParams() { + return array( + 'rootJobSignature' => isset( $this->params['rootJobSignature'] ) + ? $this->params['rootJobSignature'] + : null, + 'rootJobTimestamp' => isset( $this->params['rootJobTimestamp'] ) + ? $this->params['rootJobTimestamp'] + : null + ); + } - $fields = $this->insertFields(); - unset( $fields['job_id'] ); - unset( $fields['job_timestamp'] ); - $dbw = wfGetDB( DB_MASTER ); - $dbw->begin( __METHOD__ ); - $dbw->delete( 'job', $fields, __METHOD__ ); - $affected = $dbw->affectedRows(); - $dbw->commit( __METHOD__ ); - if ( $affected ) { - wfIncrStats( 'job-dup-delete', $affected ); - } + /** + * Insert a single job into the queue. + * @return bool true on success + * @deprecated 1.21 + */ + public function insert() { + return JobQueueGroup::singleton()->push( $this ); } /** * @return string */ - function toString() { + public function toString() { $paramString = ''; if ( $this->params ) { foreach ( $this->params as $key => $value ) { if ( $paramString != '' ) { $paramString .= ' '; } + if ( is_array( $value ) ) { + $value = "array(" . count( $value ) . ")"; + } elseif ( is_object( $value ) && !method_exists( $value, '__toString' ) ) { + $value = "object(" . get_class( $value ) . ")"; + } + $value = (string)$value; + if ( mb_strlen( $value ) > 1024 ) { + $value = "string(" . mb_strlen( $value ) . ")"; + } + $paramString .= "$key=$value"; } } @@ -441,7 +283,7 @@ abstract class Job { $this->error = $error; } - function getLastError() { + public function getLastError() { return $this->error; } } diff --git a/includes/job/JobQueue.php b/includes/job/JobQueue.php new file mode 100644 index 00000000..b0dd9258 --- /dev/null +++ b/includes/job/JobQueue.php @@ -0,0 +1,435 @@ +wiki = $params['wiki']; + $this->type = $params['type']; + $this->claimTTL = isset( $params['claimTTL'] ) ? $params['claimTTL'] : 0; + $this->maxTries = isset( $params['maxTries'] ) ? $params['maxTries'] : 3; + if ( isset( $params['order'] ) && $params['order'] !== 'any' ) { + $this->order = $params['order']; + } else { + $this->order = $this->optimalOrder(); + } + if ( !in_array( $this->order, $this->supportedOrders() ) ) { + throw new MWException( __CLASS__ . " does not support '{$this->order}' order." ); + } + } + + /** + * Get a job queue object of the specified type. + * $params includes: + * - class : What job class to use (determines job type) + * - wiki : wiki ID of the wiki the jobs are for (defaults to current wiki) + * - type : The name of the job types this queue handles + * - order : Order that pop() selects jobs, one of "fifo", "timestamp" or "random". + * If "fifo" is used, the queue will effectively be FIFO. Note that + * job completion will not appear to be exactly FIFO if there are multiple + * job runners since jobs can take different times to finish once popped. + * If "timestamp" is used, the queue will at least be loosely ordered + * by timestamp, allowing for some jobs to be popped off out of order. + * If "random" is used, pop() will pick jobs in random order. + * Note that it may only be weakly random (e.g. a lottery of the oldest X). + * If "any" is choosen, the queue will use whatever order is the fastest. + * This might be useful for improving concurrency for job acquisition. + * - claimTTL : If supported, the queue will recycle jobs that have been popped + * but not acknowledged as completed after this many seconds. Recycling + * of jobs simple means re-inserting them into the queue. Jobs can be + * attempted up to three times before being discarded. + * + * Queue classes should throw an exception if they do not support the options given. + * + * @param $params array + * @return JobQueue + * @throws MWException + */ + final public static function factory( array $params ) { + $class = $params['class']; + if ( !MWInit::classExists( $class ) ) { + throw new MWException( "Invalid job queue class '$class'." ); + } + $obj = new $class( $params ); + if ( !( $obj instanceof self ) ) { + throw new MWException( "Class '$class' is not a " . __CLASS__ . " class." ); + } + return $obj; + } + + /** + * @return string Wiki ID + */ + final public function getWiki() { + return $this->wiki; + } + + /** + * @return string Job type that this queue handles + */ + final public function getType() { + return $this->type; + } + + /** + * @return string One of (random, timestamp, fifo) + */ + final public function getOrder() { + return $this->order; + } + + /** + * @return Array Subset of (random, timestamp, fifo) + */ + abstract protected function supportedOrders(); + + /** + * @return string One of (random, timestamp, fifo) + */ + abstract protected function optimalOrder(); + + /** + * Quickly check if the queue is empty (has no available jobs). + * Queue classes should use caching if they are any slower without memcached. + * + * If caching is used, this might return false when there are actually no jobs. + * If pop() is called and returns false then it should correct the cache. Also, + * calling flushCaches() first prevents this. However, this affect is typically + * not distinguishable from the race condition between isEmpty() and pop(). + * + * @return bool + * @throws MWException + */ + final public function isEmpty() { + wfProfileIn( __METHOD__ ); + $res = $this->doIsEmpty(); + wfProfileOut( __METHOD__ ); + return $res; + } + + /** + * @see JobQueue::isEmpty() + * @return bool + */ + abstract protected function doIsEmpty(); + + /** + * Get the number of available (unacquired) jobs in the queue. + * Queue classes should use caching if they are any slower without memcached. + * + * If caching is used, this number might be out of date for a minute. + * + * @return integer + * @throws MWException + */ + final public function getSize() { + wfProfileIn( __METHOD__ ); + $res = $this->doGetSize(); + wfProfileOut( __METHOD__ ); + return $res; + } + + /** + * @see JobQueue::getSize() + * @return integer + */ + abstract protected function doGetSize(); + + /** + * Get the number of acquired jobs (these are temporarily out of the queue). + * Queue classes should use caching if they are any slower without memcached. + * + * If caching is used, this number might be out of date for a minute. + * + * @return integer + * @throws MWException + */ + final public function getAcquiredCount() { + wfProfileIn( __METHOD__ ); + $res = $this->doGetAcquiredCount(); + wfProfileOut( __METHOD__ ); + return $res; + } + + /** + * @see JobQueue::getAcquiredCount() + * @return integer + */ + abstract protected function doGetAcquiredCount(); + + /** + * Push a single jobs into the queue. + * This does not require $wgJobClasses to be set for the given job type. + * Outside callers should use JobQueueGroup::push() instead of this function. + * + * @param $jobs Job|Array + * @param $flags integer Bitfield (supports JobQueue::QoS_Atomic) + * @return bool Returns false on failure + * @throws MWException + */ + final public function push( $jobs, $flags = 0 ) { + return $this->batchPush( is_array( $jobs ) ? $jobs : array( $jobs ), $flags ); + } + + /** + * Push a batch of jobs into the queue. + * This does not require $wgJobClasses to be set for the given job type. + * Outside callers should use JobQueueGroup::push() instead of this function. + * + * @param array $jobs List of Jobs + * @param $flags integer Bitfield (supports JobQueue::QoS_Atomic) + * @return bool Returns false on failure + * @throws MWException + */ + final public function batchPush( array $jobs, $flags = 0 ) { + if ( !count( $jobs ) ) { + return true; // nothing to do + } + + foreach ( $jobs as $job ) { + if ( $job->getType() !== $this->type ) { + throw new MWException( "Got '{$job->getType()}' job; expected '{$this->type}'." ); + } + } + + wfProfileIn( __METHOD__ ); + $ok = $this->doBatchPush( $jobs, $flags ); + wfProfileOut( __METHOD__ ); + return $ok; + } + + /** + * @see JobQueue::batchPush() + * @return bool + */ + abstract protected function doBatchPush( array $jobs, $flags ); + + /** + * Pop a job off of the queue. + * This requires $wgJobClasses to be set for the given job type. + * Outside callers should use JobQueueGroup::pop() instead of this function. + * + * @return Job|bool Returns false if there are no jobs + * @throws MWException + */ + final public function pop() { + global $wgJobClasses; + + if ( $this->wiki !== wfWikiID() ) { + throw new MWException( "Cannot pop '{$this->type}' job off foreign wiki queue." ); + } elseif ( !isset( $wgJobClasses[$this->type] ) ) { + // Do not pop jobs if there is no class for the queue type + throw new MWException( "Unrecognized job type '{$this->type}'." ); + } + + wfProfileIn( __METHOD__ ); + $job = $this->doPop(); + wfProfileOut( __METHOD__ ); + return $job; + } + + /** + * @see JobQueue::pop() + * @return Job + */ + abstract protected function doPop(); + + /** + * Acknowledge that a job was completed. + * + * This does nothing for certain queue classes or if "claimTTL" is not set. + * Outside callers should use JobQueueGroup::ack() instead of this function. + * + * @param $job Job + * @return bool + * @throws MWException + */ + final public function ack( Job $job ) { + if ( $job->getType() !== $this->type ) { + throw new MWException( "Got '{$job->getType()}' job; expected '{$this->type}'." ); + } + wfProfileIn( __METHOD__ ); + $ok = $this->doAck( $job ); + wfProfileOut( __METHOD__ ); + return $ok; + } + + /** + * @see JobQueue::ack() + * @return bool + */ + abstract protected function doAck( Job $job ); + + /** + * Register the "root job" of a given job into the queue for de-duplication. + * This should only be called right *after* all the new jobs have been inserted. + * This is used to turn older, duplicate, job entries into no-ops. The root job + * information will remain in the registry until it simply falls out of cache. + * + * This requires that $job has two special fields in the "params" array: + * - rootJobSignature : hash (e.g. SHA1) that identifies the task + * - rootJobTimestamp : TS_MW timestamp of this instance of the task + * + * A "root job" is a conceptual job that consist of potentially many smaller jobs + * that are actually inserted into the queue. For example, "refreshLinks" jobs are + * spawned when a template is edited. One can think of the task as "update links + * of pages that use template X" and an instance of that task as a "root job". + * However, what actually goes into the queue are potentially many refreshLinks2 jobs. + * Since these jobs include things like page ID ranges and DB master positions, and morph + * into smaller refreshLinks2 jobs recursively, simple duplicate detection (like job_sha1) + * for individual jobs being identical is not useful. + * + * In the case of "refreshLinks", if these jobs are still in the queue when the template + * is edited again, we want all of these old refreshLinks jobs for that template to become + * no-ops. This can greatly reduce server load, since refreshLinks jobs involves parsing. + * Essentially, the new batch of jobs belong to a new "root job" and the older ones to a + * previous "root job" for the same task of "update links of pages that use template X". + * + * This does nothing for certain queue classes. + * + * @param $job Job + * @return bool + * @throws MWException + */ + final public function deduplicateRootJob( Job $job ) { + if ( $job->getType() !== $this->type ) { + throw new MWException( "Got '{$job->getType()}' job; expected '{$this->type}'." ); + } + wfProfileIn( __METHOD__ ); + $ok = $this->doDeduplicateRootJob( $job ); + wfProfileOut( __METHOD__ ); + return $ok; + } + + /** + * @see JobQueue::deduplicateRootJob() + * @param $job Job + * @return bool + */ + protected function doDeduplicateRootJob( Job $job ) { + return true; + } + + /** + * Wait for any slaves or backup servers to catch up. + * + * This does nothing for certain queue classes. + * + * @return void + * @throws MWException + */ + final public function waitForBackups() { + wfProfileIn( __METHOD__ ); + $this->doWaitForBackups(); + wfProfileOut( __METHOD__ ); + } + + /** + * @see JobQueue::waitForBackups() + * @return void + */ + protected function doWaitForBackups() {} + + /** + * Return a map of task names to task definition maps. + * A "task" is a fast periodic queue maintenance action. + * Mutually exclusive tasks must implement their own locking in the callback. + * + * Each task value is an associative array with: + * - name : the name of the task + * - callback : a PHP callable that performs the task + * - period : the period in seconds corresponding to the task frequency + * + * @return Array + */ + final public function getPeriodicTasks() { + $tasks = $this->doGetPeriodicTasks(); + foreach ( $tasks as $name => &$def ) { + $def['name'] = $name; + } + return $tasks; + } + + /** + * @see JobQueue::getPeriodicTasks() + * @return Array + */ + protected function doGetPeriodicTasks() { + return array(); + } + + /** + * Clear any process and persistent caches + * + * @return void + */ + final public function flushCaches() { + wfProfileIn( __METHOD__ ); + $this->doFlushCaches(); + wfProfileOut( __METHOD__ ); + } + + /** + * @see JobQueue::flushCaches() + * @return void + */ + protected function doFlushCaches() {} + + /** + * Get an iterator to traverse over all of the jobs in this queue. + * This does not include jobs that are current acquired. In general, + * this should only be called on a queue that is no longer being popped. + * + * @return Iterator|Traversable|Array + * @throws MWException + */ + abstract public function getAllQueuedJobs(); + + /** + * Namespace the queue with a key to isolate it for testing + * + * @param $key string + * @return void + * @throws MWException + */ + public function setTestingPrefix( $key ) { + throw new MWException( "Queue namespacing not supported for this queue type." ); + } +} diff --git a/includes/job/JobQueueAggregator.php b/includes/job/JobQueueAggregator.php new file mode 100644 index 00000000..3dba3c53 --- /dev/null +++ b/includes/job/JobQueueAggregator.php @@ -0,0 +1,139 @@ +doNotifyQueueEmpty( $wiki, $type ); + wfProfileOut( __METHOD__ ); + return $ok; + } + + /** + * @see JobQueueAggregator::notifyQueueEmpty() + */ + abstract protected function doNotifyQueueEmpty( $wiki, $type ); + + /** + * Mark a queue as being non-empty + * + * @param string $wiki + * @param string $type + * @return bool Success + */ + final public function notifyQueueNonEmpty( $wiki, $type ) { + wfProfileIn( __METHOD__ ); + $ok = $this->doNotifyQueueNonEmpty( $wiki, $type ); + wfProfileOut( __METHOD__ ); + return $ok; + } + + /** + * @see JobQueueAggregator::notifyQueueNonEmpty() + */ + abstract protected function doNotifyQueueNonEmpty( $wiki, $type ); + + /** + * Get the list of all of the queues with jobs + * + * @return Array (job type => (list of wiki IDs)) + */ + final public function getAllReadyWikiQueues() { + wfProfileIn( __METHOD__ ); + $res = $this->doGetAllReadyWikiQueues(); + wfProfileOut( __METHOD__ ); + return $res; + } + + /** + * @see JobQueueAggregator::getAllReadyWikiQueues() + */ + abstract protected function doGetAllReadyWikiQueues(); + + /** + * Get all databases that have a pending job. + * This poll all the queues and is this expensive. + * + * @return Array (job type => (list of wiki IDs)) + */ + protected function findPendingWikiQueues() { + global $wgLocalDatabases; + + $pendingDBs = array(); // (job type => (db list)) + foreach ( $wgLocalDatabases as $db ) { + foreach ( JobQueueGroup::singleton( $db )->getQueuesWithJobs() as $type ) { + $pendingDBs[$type][] = $db; + } + } + + return $pendingDBs; + } +} diff --git a/includes/job/JobQueueAggregatorMemc.php b/includes/job/JobQueueAggregatorMemc.php new file mode 100644 index 00000000..4b82cf92 --- /dev/null +++ b/includes/job/JobQueueAggregatorMemc.php @@ -0,0 +1,117 @@ +cache = isset( $params['objectCache'] ) + ? wfGetCache( $params['objectCache'] ) + : wfGetMainCache(); + $this->cacheTTL = isset( $params['cacheTTL'] ) ? $params['cacheTTL'] : 180; // 3 min + } + + /** + * @see JobQueueAggregator::doNotifyQueueEmpty() + */ + protected function doNotifyQueueEmpty( $wiki, $type ) { + $key = $this->getReadyQueueCacheKey(); + // Delist the queue from the "ready queue" list + if ( $this->cache->add( "$key:lock", 1, 60 ) ) { // lock + $curInfo = $this->cache->get( $key ); + if ( is_array( $curInfo ) && isset( $curInfo['pendingDBs'][$type] ) ) { + if ( in_array( $wiki, $curInfo['pendingDBs'][$type] ) ) { + $curInfo['pendingDBs'][$type] = array_diff( + $curInfo['pendingDBs'][$type], array( $wiki ) ); + $this->cache->set( $key, $curInfo ); + } + } + $this->cache->delete( "$key:lock" ); // unlock + } + return true; + } + + /** + * @see JobQueueAggregator::doNotifyQueueNonEmpty() + */ + protected function doNotifyQueueNonEmpty( $wiki, $type ) { + return true; // updated periodically + } + + /** + * @see JobQueueAggregator::doAllGetReadyWikiQueues() + */ + protected function doGetAllReadyWikiQueues() { + $key = $this->getReadyQueueCacheKey(); + // If the cache entry wasn't present, is stale, or in .1% of cases otherwise, + // regenerate the cache. Use any available stale cache if another process is + // currently regenerating the pending DB information. + $pendingDbInfo = $this->cache->get( $key ); + if ( !is_array( $pendingDbInfo ) + || ( time() - $pendingDbInfo['timestamp'] ) > $this->cacheTTL + || mt_rand( 0, 999 ) == 0 + ) { + if ( $this->cache->add( "$key:rebuild", 1, 1800 ) ) { // lock + $pendingDbInfo = array( + 'pendingDBs' => $this->findPendingWikiQueues(), + 'timestamp' => time() + ); + for ( $attempts=1; $attempts <= 25; ++$attempts ) { + if ( $this->cache->add( "$key:lock", 1, 60 ) ) { // lock + $this->cache->set( $key, $pendingDbInfo ); + $this->cache->delete( "$key:lock" ); // unlock + break; + } + } + $this->cache->delete( "$key:rebuild" ); // unlock + } + } + return is_array( $pendingDbInfo ) + ? $pendingDbInfo['pendingDBs'] + : array(); // cache is both empty and locked + } + + /** + * @return string + */ + private function getReadyQueueCacheKey() { + return "jobqueue:aggregator:ready-queues:v1"; // global + } +} diff --git a/includes/job/JobQueueAggregatorRedis.php b/includes/job/JobQueueAggregatorRedis.php new file mode 100644 index 00000000..74e9171c --- /dev/null +++ b/includes/job/JobQueueAggregatorRedis.php @@ -0,0 +1,165 @@ +server = $params['redisServer']; + $this->redisPool = RedisConnectionPool::singleton( $params['redisConfig'] ); + } + + /** + * @see JobQueueAggregator::doNotifyQueueEmpty() + */ + protected function doNotifyQueueEmpty( $wiki, $type ) { + $conn = $this->getConnection(); + if ( !$conn ) { + return false; + } + try { + $conn->hDel( $this->getReadyQueueKey(), $this->encQueueName( $type, $wiki ) ); + return true; + } catch ( RedisException $e ) { + $this->handleException( $conn, $e ); + return false; + } + } + + /** + * @see JobQueueAggregator::doNotifyQueueNonEmpty() + */ + protected function doNotifyQueueNonEmpty( $wiki, $type ) { + $conn = $this->getConnection(); + if ( !$conn ) { + return false; + } + try { + $conn->hSet( $this->getReadyQueueKey(), $this->encQueueName( $type, $wiki ), time() ); + return true; + } catch ( RedisException $e ) { + $this->handleException( $conn, $e ); + return false; + } + } + + /** + * @see JobQueueAggregator::doAllGetReadyWikiQueues() + */ + protected function doGetAllReadyWikiQueues() { + $conn = $this->getConnection(); + if ( !$conn ) { + return array(); + } + try { + $conn->multi( Redis::PIPELINE ); + $conn->exists( $this->getReadyQueueKey() ); + $conn->hGetAll( $this->getReadyQueueKey() ); + list( $exists, $map ) = $conn->exec(); + + if ( $exists ) { // cache hit + $pendingDBs = array(); // (type => list of wikis) + foreach ( $map as $key => $time ) { + list( $type, $wiki ) = $this->dencQueueName( $key ); + $pendingDBs[$type][] = $wiki; + } + } else { // cache miss + $pendingDBs = $this->findPendingWikiQueues(); // (type => list of wikis) + + $now = time(); + $map = array(); + foreach ( $pendingDBs as $type => $wikis ) { + foreach ( $wikis as $wiki ) { + $map[$this->encQueueName( $type, $wiki )] = $now; + } + } + $conn->hMSet( $this->getReadyQueueKey(), $map ); + } + + return $pendingDBs; + } catch ( RedisException $e ) { + $this->handleException( $conn, $e ); + return array(); + } + } + + /** + * Get a connection to the server that handles all sub-queues for this queue + * + * @return Array (server name, Redis instance) + * @throws MWException + */ + protected function getConnection() { + return $this->redisPool->getConnection( $this->server ); + } + + /** + * @param RedisConnRef $conn + * @param RedisException $e + * @return void + */ + protected function handleException( RedisConnRef $conn, $e ) { + $this->redisPool->handleException( $this->server, $conn, $e ); + } + + /** + * @return string + */ + private function getReadyQueueKey() { + return "jobqueue:aggregator:h-ready-queues:v1"; // global + } + + /** + * @param string $type + * @param string $wiki + * @return string + */ + private function encQueueName( $type, $wiki ) { + return rawurlencode( $type ) . '/' . rawurlencode( $wiki ); + } + + /** + * @param string $name + * @return string + */ + private function dencQueueName( $name ) { + list( $type, $wiki ) = explode( '/', $name, 2 ); + return array( rawurldecode( $type ), rawurldecode( $wiki ) ); + } +} diff --git a/includes/job/JobQueueDB.php b/includes/job/JobQueueDB.php new file mode 100644 index 00000000..ff7f7abc --- /dev/null +++ b/includes/job/JobQueueDB.php @@ -0,0 +1,716 @@ +cluster = isset( $params['cluster'] ) ? $params['cluster'] : false; + } + + protected function supportedOrders() { + return array( 'random', 'timestamp', 'fifo' ); + } + + protected function optimalOrder() { + return 'random'; + } + + /** + * @see JobQueue::doIsEmpty() + * @return bool + */ + protected function doIsEmpty() { + global $wgMemc; + + $key = $this->getCacheKey( 'empty' ); + + $isEmpty = $wgMemc->get( $key ); + if ( $isEmpty === 'true' ) { + return true; + } elseif ( $isEmpty === 'false' ) { + return false; + } + + list( $dbr, $scope ) = $this->getSlaveDB(); + $found = $dbr->selectField( // unclaimed job + 'job', '1', array( 'job_cmd' => $this->type, 'job_token' => '' ), __METHOD__ + ); + $wgMemc->add( $key, $found ? 'false' : 'true', self::CACHE_TTL_LONG ); + + return !$found; + } + + /** + * @see JobQueue::doGetSize() + * @return integer + */ + protected function doGetSize() { + global $wgMemc; + + $key = $this->getCacheKey( 'size' ); + + $size = $wgMemc->get( $key ); + if ( is_int( $size ) ) { + return $size; + } + + list( $dbr, $scope ) = $this->getSlaveDB(); + $size = (int)$dbr->selectField( 'job', 'COUNT(*)', + array( 'job_cmd' => $this->type, 'job_token' => '' ), + __METHOD__ + ); + $wgMemc->set( $key, $size, self::CACHE_TTL_SHORT ); + + return $size; + } + + /** + * @see JobQueue::doGetAcquiredCount() + * @return integer + */ + protected function doGetAcquiredCount() { + global $wgMemc; + + if ( $this->claimTTL <= 0 ) { + return 0; // no acknowledgements + } + + $key = $this->getCacheKey( 'acquiredcount' ); + + $count = $wgMemc->get( $key ); + if ( is_int( $count ) ) { + return $count; + } + + list( $dbr, $scope ) = $this->getSlaveDB(); + $count = (int)$dbr->selectField( 'job', 'COUNT(*)', + array( 'job_cmd' => $this->type, "job_token != {$dbr->addQuotes( '' )}" ), + __METHOD__ + ); + $wgMemc->set( $key, $count, self::CACHE_TTL_SHORT ); + + return $count; + } + + /** + * @see JobQueue::doBatchPush() + * @param array $jobs + * @param $flags + * @throws DBError|Exception + * @return bool + */ + protected function doBatchPush( array $jobs, $flags ) { + if ( count( $jobs ) ) { + list( $dbw, $scope ) = $this->getMasterDB(); + + $rowSet = array(); // (sha1 => job) map for jobs that are de-duplicated + $rowList = array(); // list of jobs for jobs that are are not de-duplicated + + foreach ( $jobs as $job ) { + $row = $this->insertFields( $job ); + if ( $job->ignoreDuplicates() ) { + $rowSet[$row['job_sha1']] = $row; + } else { + $rowList[] = $row; + } + } + + $key = $this->getCacheKey( 'empty' ); + $atomic = ( $flags & self::QoS_Atomic ); + + $dbw->onTransactionIdle( + function() use ( $dbw, $rowSet, $rowList, $atomic, $key, $scope + ) { + global $wgMemc; + + if ( $atomic ) { + $dbw->begin( __METHOD__ ); // wrap all the job additions in one transaction + } + try { + // Strip out any duplicate jobs that are already in the queue... + if ( count( $rowSet ) ) { + $res = $dbw->select( 'job', 'job_sha1', + array( + // No job_type condition since it's part of the job_sha1 hash + 'job_sha1' => array_keys( $rowSet ), + 'job_token' => '' // unclaimed + ), + __METHOD__ + ); + foreach ( $res as $row ) { + wfDebug( "Job with hash '{$row->job_sha1}' is a duplicate." ); + unset( $rowSet[$row->job_sha1] ); // already enqueued + } + } + // Build the full list of job rows to insert + $rows = array_merge( $rowList, array_values( $rowSet ) ); + // Insert the job rows in chunks to avoid slave lag... + foreach ( array_chunk( $rows, 50 ) as $rowBatch ) { + $dbw->insert( 'job', $rowBatch, __METHOD__ ); + } + wfIncrStats( 'job-insert', count( $rows ) ); + wfIncrStats( 'job-insert-duplicate', + count( $rowSet ) + count( $rowList ) - count( $rows ) ); + } catch ( DBError $e ) { + if ( $atomic ) { + $dbw->rollback( __METHOD__ ); + } + throw $e; + } + if ( $atomic ) { + $dbw->commit( __METHOD__ ); + } + + $wgMemc->set( $key, 'false', JobQueueDB::CACHE_TTL_LONG ); + } ); + } + + return true; + } + + /** + * @see JobQueue::doPop() + * @return Job|bool + */ + protected function doPop() { + global $wgMemc; + + if ( $wgMemc->get( $this->getCacheKey( 'empty' ) ) === 'true' ) { + return false; // queue is empty + } + + list( $dbw, $scope ) = $this->getMasterDB(); + $dbw->commit( __METHOD__, 'flush' ); // flush existing transaction + $autoTrx = $dbw->getFlag( DBO_TRX ); // get current setting + $dbw->clearFlag( DBO_TRX ); // make each query its own transaction + $scopedReset = new ScopedCallback( function() use ( $dbw, $autoTrx ) { + $dbw->setFlag( $autoTrx ? DBO_TRX : 0 ); // restore old setting + } ); + + $uuid = wfRandomString( 32 ); // pop attempt + $job = false; // job popped off + do { // retry when our row is invalid or deleted as a duplicate + // Try to reserve a row in the DB... + if ( in_array( $this->order, array( 'fifo', 'timestamp' ) ) ) { + $row = $this->claimOldest( $uuid ); + } else { // random first + $rand = mt_rand( 0, self::MAX_JOB_RANDOM ); // encourage concurrent UPDATEs + $gte = (bool)mt_rand( 0, 1 ); // find rows with rand before/after $rand + $row = $this->claimRandom( $uuid, $rand, $gte ); + } + // Check if we found a row to reserve... + if ( !$row ) { + $wgMemc->set( $this->getCacheKey( 'empty' ), 'true', self::CACHE_TTL_LONG ); + break; // nothing to do + } + wfIncrStats( 'job-pop' ); + // Get the job object from the row... + $title = Title::makeTitleSafe( $row->job_namespace, $row->job_title ); + if ( !$title ) { + $dbw->delete( 'job', array( 'job_id' => $row->job_id ), __METHOD__ ); + wfDebugLog( 'JobQueueDB', "Row has invalid title '{$row->job_title}'." ); + continue; // try again + } + $job = Job::factory( $row->job_cmd, $title, + self::extractBlob( $row->job_params ), $row->job_id ); + $job->id = $row->job_id; // XXX: work around broken subclasses + // Flag this job as an old duplicate based on its "root" job... + if ( $this->isRootJobOldDuplicate( $job ) ) { + wfIncrStats( 'job-pop-duplicate' ); + $job = DuplicateJob::newFromJob( $job ); // convert to a no-op + } + break; // done + } while( true ); + + return $job; + } + + /** + * Reserve a row with a single UPDATE without holding row locks over RTTs... + * + * @param string $uuid 32 char hex string + * @param $rand integer Random unsigned integer (31 bits) + * @param bool $gte Search for job_random >= $random (otherwise job_random <= $random) + * @return Row|false + */ + protected function claimRandom( $uuid, $rand, $gte ) { + global $wgMemc; + + list( $dbw, $scope ) = $this->getMasterDB(); + // Check cache to see if the queue has <= OFFSET items + $tinyQueue = $wgMemc->get( $this->getCacheKey( 'small' ) ); + + $row = false; // the row acquired + $invertedDirection = false; // whether one job_random direction was already scanned + // This uses a replication safe method for acquiring jobs. One could use UPDATE+LIMIT + // instead, but that either uses ORDER BY (in which case it deadlocks in MySQL) or is + // not replication safe. Due to http://bugs.mysql.com/bug.php?id=6980, subqueries cannot + // be used here with MySQL. + do { + if ( $tinyQueue ) { // queue has <= MAX_OFFSET rows + // For small queues, using OFFSET will overshoot and return no rows more often. + // Instead, this uses job_random to pick a row (possibly checking both directions). + $ineq = $gte ? '>=' : '<='; + $dir = $gte ? 'ASC' : 'DESC'; + $row = $dbw->selectRow( 'job', '*', // find a random job + array( + 'job_cmd' => $this->type, + 'job_token' => '', // unclaimed + "job_random {$ineq} {$dbw->addQuotes( $rand )}" ), + __METHOD__, + array( 'ORDER BY' => "job_random {$dir}" ) + ); + if ( !$row && !$invertedDirection ) { + $gte = !$gte; + $invertedDirection = true; + continue; // try the other direction + } + } else { // table *may* have >= MAX_OFFSET rows + // Bug 42614: "ORDER BY job_random" with a job_random inequality causes high CPU + // in MySQL if there are many rows for some reason. This uses a small OFFSET + // instead of job_random for reducing excess claim retries. + $row = $dbw->selectRow( 'job', '*', // find a random job + array( + 'job_cmd' => $this->type, + 'job_token' => '', // unclaimed + ), + __METHOD__, + array( 'OFFSET' => mt_rand( 0, self::MAX_OFFSET ) ) + ); + if ( !$row ) { + $tinyQueue = true; // we know the queue must have <= MAX_OFFSET rows + $wgMemc->set( $this->getCacheKey( 'small' ), 1, 30 ); + continue; // use job_random + } + } + if ( $row ) { // claim the job + $dbw->update( 'job', // update by PK + array( + 'job_token' => $uuid, + 'job_token_timestamp' => $dbw->timestamp(), + 'job_attempts = job_attempts+1' ), + array( 'job_cmd' => $this->type, 'job_id' => $row->job_id, 'job_token' => '' ), + __METHOD__ + ); + // This might get raced out by another runner when claiming the previously + // selected row. The use of job_random should minimize this problem, however. + if ( !$dbw->affectedRows() ) { + $row = false; // raced out + } + } else { + break; // nothing to do + } + } while ( !$row ); + + return $row; + } + + /** + * Reserve a row with a single UPDATE without holding row locks over RTTs... + * + * @param string $uuid 32 char hex string + * @return Row|false + */ + protected function claimOldest( $uuid ) { + list( $dbw, $scope ) = $this->getMasterDB(); + + $row = false; // the row acquired + do { + if ( $dbw->getType() === 'mysql' ) { + // Per http://bugs.mysql.com/bug.php?id=6980, we can't use subqueries on the + // same table being changed in an UPDATE query in MySQL (gives Error: 1093). + // Oracle and Postgre have no such limitation. However, MySQL offers an + // alternative here by supporting ORDER BY + LIMIT for UPDATE queries. + $dbw->query( "UPDATE {$dbw->tableName( 'job' )} " . + "SET " . + "job_token = {$dbw->addQuotes( $uuid ) }, " . + "job_token_timestamp = {$dbw->addQuotes( $dbw->timestamp() )}, " . + "job_attempts = job_attempts+1 " . + "WHERE ( " . + "job_cmd = {$dbw->addQuotes( $this->type )} " . + "AND job_token = {$dbw->addQuotes( '' )} " . + ") ORDER BY job_id ASC LIMIT 1", + __METHOD__ + ); + } else { + // Use a subquery to find the job, within an UPDATE to claim it. + // This uses as much of the DB wrapper functions as possible. + $dbw->update( 'job', + array( + 'job_token' => $uuid, + 'job_token_timestamp' => $dbw->timestamp(), + 'job_attempts = job_attempts+1' ), + array( 'job_id = (' . + $dbw->selectSQLText( 'job', 'job_id', + array( 'job_cmd' => $this->type, 'job_token' => '' ), + __METHOD__, + array( 'ORDER BY' => 'job_id ASC', 'LIMIT' => 1 ) ) . + ')' + ), + __METHOD__ + ); + } + // Fetch any row that we just reserved... + if ( $dbw->affectedRows() ) { + $row = $dbw->selectRow( 'job', '*', + array( 'job_cmd' => $this->type, 'job_token' => $uuid ), __METHOD__ + ); + if ( !$row ) { // raced out by duplicate job removal + wfDebugLog( 'JobQueueDB', "Row deleted as duplicate by another process." ); + } + } else { + break; // nothing to do + } + } while ( !$row ); + + return $row; + } + + /** + * Recycle or destroy any jobs that have been claimed for too long + * + * @return integer Number of jobs recycled/deleted + */ + public function recycleAndDeleteStaleJobs() { + global $wgMemc; + + $now = time(); + list( $dbw, $scope ) = $this->getMasterDB(); + $count = 0; // affected rows + + if ( !$dbw->lock( "jobqueue-recycle-{$this->type}", __METHOD__, 1 ) ) { + return $count; // already in progress + } + + // Remove claims on jobs acquired for too long if enabled... + if ( $this->claimTTL > 0 ) { + $claimCutoff = $dbw->timestamp( $now - $this->claimTTL ); + // Get the IDs of jobs that have be claimed but not finished after too long. + // These jobs can be recycled into the queue by expiring the claim. Selecting + // the IDs first means that the UPDATE can be done by primary key (less deadlocks). + $res = $dbw->select( 'job', 'job_id', + array( + 'job_cmd' => $this->type, + "job_token != {$dbw->addQuotes( '' )}", // was acquired + "job_token_timestamp < {$dbw->addQuotes( $claimCutoff )}", // stale + "job_attempts < {$dbw->addQuotes( $this->maxTries )}" ), // retries left + __METHOD__ + ); + $ids = array_map( function( $o ) { return $o->job_id; }, iterator_to_array( $res ) ); + if ( count( $ids ) ) { + // Reset job_token for these jobs so that other runners will pick them up. + // Set the timestamp to the current time, as it is useful to now that the job + // was already tried before (the timestamp becomes the "released" time). + $dbw->update( 'job', + array( + 'job_token' => '', + 'job_token_timestamp' => $dbw->timestamp( $now ) ), // time of release + array( + 'job_id' => $ids ), + __METHOD__ + ); + $count += $dbw->affectedRows(); + wfIncrStats( 'job-recycle', $dbw->affectedRows() ); + $wgMemc->set( $this->getCacheKey( 'empty' ), 'false', self::CACHE_TTL_LONG ); + } + } + + // Just destroy any stale jobs... + $pruneCutoff = $dbw->timestamp( $now - self::MAX_AGE_PRUNE ); + $conds = array( + 'job_cmd' => $this->type, + "job_token != {$dbw->addQuotes( '' )}", // was acquired + "job_token_timestamp < {$dbw->addQuotes( $pruneCutoff )}" // stale + ); + if ( $this->claimTTL > 0 ) { // only prune jobs attempted too many times... + $conds[] = "job_attempts >= {$dbw->addQuotes( $this->maxTries )}"; + } + // Get the IDs of jobs that are considered stale and should be removed. Selecting + // the IDs first means that the UPDATE can be done by primary key (less deadlocks). + $res = $dbw->select( 'job', 'job_id', $conds, __METHOD__ ); + $ids = array_map( function( $o ) { return $o->job_id; }, iterator_to_array( $res ) ); + if ( count( $ids ) ) { + $dbw->delete( 'job', array( 'job_id' => $ids ), __METHOD__ ); + $count += $dbw->affectedRows(); + } + + $dbw->unlock( "jobqueue-recycle-{$this->type}", __METHOD__ ); + + return $count; + } + + /** + * @see JobQueue::doAck() + * @param Job $job + * @throws MWException + * @return Job|bool + */ + protected function doAck( Job $job ) { + if ( !$job->getId() ) { + throw new MWException( "Job of type '{$job->getType()}' has no ID." ); + } + + list( $dbw, $scope ) = $this->getMasterDB(); + $dbw->commit( __METHOD__, 'flush' ); // flush existing transaction + $autoTrx = $dbw->getFlag( DBO_TRX ); // get current setting + $dbw->clearFlag( DBO_TRX ); // make each query its own transaction + $scopedReset = new ScopedCallback( function() use ( $dbw, $autoTrx ) { + $dbw->setFlag( $autoTrx ? DBO_TRX : 0 ); // restore old setting + } ); + + // Delete a row with a single DELETE without holding row locks over RTTs... + $dbw->delete( 'job', + array( 'job_cmd' => $this->type, 'job_id' => $job->getId() ), __METHOD__ ); + + return true; + } + + /** + * @see JobQueue::doDeduplicateRootJob() + * @param Job $job + * @throws MWException + * @return bool + */ + protected function doDeduplicateRootJob( Job $job ) { + $params = $job->getParams(); + if ( !isset( $params['rootJobSignature'] ) ) { + throw new MWException( "Cannot register root job; missing 'rootJobSignature'." ); + } elseif ( !isset( $params['rootJobTimestamp'] ) ) { + throw new MWException( "Cannot register root job; missing 'rootJobTimestamp'." ); + } + $key = $this->getRootJobCacheKey( $params['rootJobSignature'] ); + // Callers should call batchInsert() and then this function so that if the insert + // fails, the de-duplication registration will be aborted. Since the insert is + // deferred till "transaction idle", do the same here, so that the ordering is + // maintained. Having only the de-duplication registration succeed would cause + // jobs to become no-ops without any actual jobs that made them redundant. + list( $dbw, $scope ) = $this->getMasterDB(); + $dbw->onTransactionIdle( function() use ( $params, $key, $scope ) { + global $wgMemc; + + $timestamp = $wgMemc->get( $key ); // current last timestamp of this job + if ( $timestamp && $timestamp >= $params['rootJobTimestamp'] ) { + return true; // a newer version of this root job was enqueued + } + + // Update the timestamp of the last root job started at the location... + return $wgMemc->set( $key, $params['rootJobTimestamp'], JobQueueDB::ROOTJOB_TTL ); + } ); + + return true; + } + + /** + * Check if the "root" job of a given job has been superseded by a newer one + * + * @param $job Job + * @return bool + */ + protected function isRootJobOldDuplicate( Job $job ) { + global $wgMemc; + + $params = $job->getParams(); + if ( !isset( $params['rootJobSignature'] ) ) { + return false; // job has no de-deplication info + } elseif ( !isset( $params['rootJobTimestamp'] ) ) { + trigger_error( "Cannot check root job; missing 'rootJobTimestamp'." ); + return false; + } + + // Get the last time this root job was enqueued + $timestamp = $wgMemc->get( $this->getRootJobCacheKey( $params['rootJobSignature'] ) ); + + // Check if a new root job was started at the location after this one's... + return ( $timestamp && $timestamp > $params['rootJobTimestamp'] ); + } + + /** + * @see JobQueue::doWaitForBackups() + * @return void + */ + protected function doWaitForBackups() { + wfWaitForSlaves(); + } + + /** + * @return Array + */ + protected function doGetPeriodicTasks() { + return array( + 'recycleAndDeleteStaleJobs' => array( + 'callback' => array( $this, 'recycleAndDeleteStaleJobs' ), + 'period' => ceil( $this->claimTTL / 2 ) + ) + ); + } + + /** + * @return void + */ + protected function doFlushCaches() { + global $wgMemc; + + foreach ( array( 'empty', 'size', 'acquiredcount' ) as $type ) { + $wgMemc->delete( $this->getCacheKey( $type ) ); + } + } + + /** + * @see JobQueue::getAllQueuedJobs() + * @return Iterator + */ + public function getAllQueuedJobs() { + list( $dbr, $scope ) = $this->getSlaveDB(); + return new MappedIterator( + $dbr->select( 'job', '*', array( 'job_cmd' => $this->getType(), 'job_token' => '' ) ), + function( $row ) use ( $scope ) { + $job = Job::factory( + $row->job_cmd, + Title::makeTitle( $row->job_namespace, $row->job_title ), + strlen( $row->job_params ) ? unserialize( $row->job_params ) : false, + $row->job_id + ); + $job->id = $row->job_id; // XXX: work around broken subclasses + return $job; + } + ); + } + + /** + * @return Array (DatabaseBase, ScopedCallback) + */ + protected function getSlaveDB() { + return $this->getDB( DB_SLAVE ); + } + + /** + * @return Array (DatabaseBase, ScopedCallback) + */ + protected function getMasterDB() { + return $this->getDB( DB_MASTER ); + } + + /** + * @param $index integer (DB_SLAVE/DB_MASTER) + * @return Array (DatabaseBase, ScopedCallback) + */ + protected function getDB( $index ) { + $lb = ( $this->cluster !== false ) + ? wfGetLBFactory()->getExternalLB( $this->cluster, $this->wiki ) + : wfGetLB( $this->wiki ); + $conn = $lb->getConnection( $index, array(), $this->wiki ); + return array( + $conn, + new ScopedCallback( function() use ( $lb, $conn ) { + $lb->reuseConnection( $conn ); + } ) + ); + } + + /** + * @param $job Job + * @return array + */ + protected function insertFields( Job $job ) { + list( $dbw, $scope ) = $this->getMasterDB(); + return array( + // Fields that describe the nature of the job + 'job_cmd' => $job->getType(), + 'job_namespace' => $job->getTitle()->getNamespace(), + 'job_title' => $job->getTitle()->getDBkey(), + 'job_params' => self::makeBlob( $job->getParams() ), + // Additional job metadata + 'job_id' => $dbw->nextSequenceValue( 'job_job_id_seq' ), + 'job_timestamp' => $dbw->timestamp(), + 'job_sha1' => wfBaseConvert( + sha1( serialize( $job->getDeduplicationInfo() ) ), + 16, 36, 31 + ), + 'job_random' => mt_rand( 0, self::MAX_JOB_RANDOM ) + ); + } + + /** + * @return string + */ + private function getCacheKey( $property ) { + list( $db, $prefix ) = wfSplitWikiID( $this->wiki ); + return wfForeignMemcKey( $db, $prefix, 'jobqueue', $this->type, $property ); + } + + /** + * @param string $signature Hash identifier of the root job + * @return string + */ + private function getRootJobCacheKey( $signature ) { + list( $db, $prefix ) = wfSplitWikiID( $this->wiki ); + return wfForeignMemcKey( $db, $prefix, 'jobqueue', $this->type, 'rootjob', $signature ); + } + + /** + * @param $params + * @return string + */ + protected static function makeBlob( $params ) { + if ( $params !== false ) { + return serialize( $params ); + } else { + return ''; + } + } + + /** + * @param $blob + * @return bool|mixed + */ + protected static function extractBlob( $blob ) { + if ( (string)$blob !== '' ) { + return unserialize( $blob ); + } else { + return false; + } + } +} diff --git a/includes/job/JobQueueGroup.php b/includes/job/JobQueueGroup.php new file mode 100644 index 00000000..351c71a3 --- /dev/null +++ b/includes/job/JobQueueGroup.php @@ -0,0 +1,351 @@ +wiki = $wiki; + $this->cache = new ProcessCacheLRU( 10 ); + } + + /** + * @param string $wiki Wiki ID + * @return JobQueueGroup + */ + public static function singleton( $wiki = false ) { + $wiki = ( $wiki === false ) ? wfWikiID() : $wiki; + if ( !isset( self::$instances[$wiki] ) ) { + self::$instances[$wiki] = new self( $wiki ); + } + return self::$instances[$wiki]; + } + + /** + * Destroy the singleton instances + * + * @return void + */ + public static function destroySingletons() { + self::$instances = array(); + } + + /** + * Get the job queue object for a given queue type + * + * @param $type string + * @return JobQueue + */ + public function get( $type ) { + global $wgJobTypeConf; + + $conf = array( 'wiki' => $this->wiki, 'type' => $type ); + if ( isset( $wgJobTypeConf[$type] ) ) { + $conf = $conf + $wgJobTypeConf[$type]; + } else { + $conf = $conf + $wgJobTypeConf['default']; + } + + return JobQueue::factory( $conf ); + } + + /** + * Insert jobs into the respective queues of with the belong. + * + * This inserts the jobs into the queue specified by $wgJobTypeConf + * and updates the aggregate job queue information cache as needed. + * + * @param $jobs Job|array A single Job or a list of Jobs + * @throws MWException + * @return bool + */ + public function push( $jobs ) { + $jobs = is_array( $jobs ) ? $jobs : array( $jobs ); + + $jobsByType = array(); // (job type => list of jobs) + foreach ( $jobs as $job ) { + if ( $job instanceof Job ) { + $jobsByType[$job->getType()][] = $job; + } else { + throw new MWException( "Attempted to push a non-Job object into a queue." ); + } + } + + $ok = true; + foreach ( $jobsByType as $type => $jobs ) { + if ( $this->get( $type )->push( $jobs ) ) { + JobQueueAggregator::singleton()->notifyQueueNonEmpty( $this->wiki, $type ); + } else { + $ok = false; + } + } + + if ( $this->cache->has( 'queues-ready', 'list' ) ) { + $list = $this->cache->get( 'queues-ready', 'list' ); + if ( count( array_diff( array_keys( $jobsByType ), $list ) ) ) { + $this->cache->clear( 'queues-ready' ); + } + } + + return $ok; + } + + /** + * Pop a job off one of the job queues + * + * This pops a job off a queue as specified by $wgJobTypeConf and + * updates the aggregate job queue information cache as needed. + * + * @param $qtype integer|string JobQueueGroup::TYPE_DEFAULT or type string + * @param $flags integer Bitfield of JobQueueGroup::USE_* constants + * @return Job|bool Returns false on failure + */ + public function pop( $qtype = self::TYPE_DEFAULT, $flags = 0 ) { + if ( is_string( $qtype ) ) { // specific job type + $job = $this->get( $qtype )->pop(); + if ( !$job ) { + JobQueueAggregator::singleton()->notifyQueueEmpty( $this->wiki, $qtype ); + } + return $job; + } else { // any job in the "default" jobs types + if ( $flags & self::USE_CACHE ) { + if ( !$this->cache->has( 'queues-ready', 'list', self::PROC_CACHE_TTL ) ) { + $this->cache->set( 'queues-ready', 'list', $this->getQueuesWithJobs() ); + } + $types = $this->cache->get( 'queues-ready', 'list' ); + } else { + $types = $this->getQueuesWithJobs(); + } + + if ( $qtype == self::TYPE_DEFAULT ) { + $types = array_intersect( $types, $this->getDefaultQueueTypes() ); + } + shuffle( $types ); // avoid starvation + + foreach ( $types as $type ) { // for each queue... + $job = $this->get( $type )->pop(); + if ( $job ) { // found + return $job; + } else { // not found + JobQueueAggregator::singleton()->notifyQueueEmpty( $this->wiki, $type ); + $this->cache->clear( 'queues-ready' ); + } + } + + return false; // no jobs found + } + } + + /** + * Acknowledge that a job was completed + * + * @param $job Job + * @return bool + */ + public function ack( Job $job ) { + return $this->get( $job->getType() )->ack( $job ); + } + + /** + * Register the "root job" of a given job into the queue for de-duplication. + * This should only be called right *after* all the new jobs have been inserted. + * + * @param $job Job + * @return bool + */ + public function deduplicateRootJob( Job $job ) { + return $this->get( $job->getType() )->deduplicateRootJob( $job ); + } + + /** + * Wait for any slaves or backup queue servers to catch up. + * + * This does nothing for certain queue classes. + * + * @return void + * @throws MWException + */ + public function waitForBackups() { + global $wgJobTypeConf; + + wfProfileIn( __METHOD__ ); + // Try to avoid doing this more than once per queue storage medium + foreach ( $wgJobTypeConf as $type => $conf ) { + $this->get( $type )->waitForBackups(); + } + wfProfileOut( __METHOD__ ); + } + + /** + * Get the list of queue types + * + * @return array List of strings + */ + public function getQueueTypes() { + return array_keys( $this->getCachedConfigVar( 'wgJobClasses' ) ); + } + + /** + * Get the list of default queue types + * + * @return array List of strings + */ + public function getDefaultQueueTypes() { + global $wgJobTypesExcludedFromDefaultQueue; + + return array_diff( $this->getQueueTypes(), $wgJobTypesExcludedFromDefaultQueue ); + } + + /** + * Get the list of job types that have non-empty queues + * + * @return Array List of job types that have non-empty queues + */ + public function getQueuesWithJobs() { + $types = array(); + foreach ( $this->getQueueTypes() as $type ) { + if ( !$this->get( $type )->isEmpty() ) { + $types[] = $type; + } + } + return $types; + } + + /** + * Check if jobs should not be popped of a queue right now. + * This is only used for performance, such as to avoid spamming + * the queue with many sub-jobs before they actually get run. + * + * @param $type string + * @return bool + */ + public function isQueueDeprioritized( $type ) { + if ( $type === 'refreshLinks2' ) { + // Don't keep converting refreshLinks2 => refreshLinks jobs if the + // later jobs have not been done yet. This helps throttle queue spam. + return !$this->get( 'refreshLinks' )->isEmpty(); + } + return false; + } + + /** + * Execute any due periodic queue maintenance tasks for all queues. + * + * A task is "due" if the time ellapsed since the last run is greater than + * the defined run period. Concurrent calls to this function will cause tasks + * to be attempted twice, so they may need their own methods of mutual exclusion. + * + * @return integer Number of tasks run + */ + public function executeReadyPeriodicTasks() { + global $wgMemc; + + list( $db, $prefix ) = wfSplitWikiID( $this->wiki ); + $key = wfForeignMemcKey( $db, $prefix, 'jobqueuegroup', 'taskruns', 'v1' ); + $lastRuns = $wgMemc->get( $key ); // (queue => task => UNIX timestamp) + + $count = 0; + $tasksRun = array(); // (queue => task => UNIX timestamp) + foreach ( $this->getQueueTypes() as $type ) { + $queue = $this->get( $type ); + foreach ( $queue->getPeriodicTasks() as $task => $definition ) { + if ( $definition['period'] <= 0 ) { + continue; // disabled + } elseif ( !isset( $lastRuns[$type][$task] ) + || $lastRuns[$type][$task] < ( time() - $definition['period'] ) ) + { + if ( call_user_func( $definition['callback'] ) !== null ) { + $tasksRun[$type][$task] = time(); + ++$count; + } + } + } + } + + $wgMemc->merge( $key, function( $cache, $key, $lastRuns ) use ( $tasksRun ) { + if ( is_array( $lastRuns ) ) { + foreach ( $tasksRun as $type => $tasks ) { + foreach ( $tasks as $task => $timestamp ) { + if ( !isset( $lastRuns[$type][$task] ) + || $timestamp > $lastRuns[$type][$task] ) + { + $lastRuns[$type][$task] = $timestamp; + } + } + } + } else { + $lastRuns = $tasksRun; + } + return $lastRuns; + } ); + + return $count; + } + + /** + * @param $name string + * @return mixed + */ + private function getCachedConfigVar( $name ) { + global $wgConf, $wgMemc; + + if ( $this->wiki === wfWikiID() ) { + return $GLOBALS[$name]; // common case + } else { + list( $db, $prefix ) = wfSplitWikiID( $this->wiki ); + $key = wfForeignMemcKey( $db, $prefix, 'configvalue', $name ); + $value = $wgMemc->get( $key ); // ('v' => ...) or false + if ( is_array( $value ) ) { + return $value['v']; + } else { + $value = $wgConf->getConfig( $this->wiki, $name ); + $wgMemc->set( $key, array( 'v' => $value ), 86400 + mt_rand( 0, 86400 ) ); + return $value; + } + } + } +} diff --git a/includes/job/README b/includes/job/README new file mode 100644 index 00000000..c11d5a78 --- /dev/null +++ b/includes/job/README @@ -0,0 +1,81 @@ +/*! +\ingroup JobQueue +\page jobqueue_design Job queue design + +Notes on the Job queuing system architecture. + +\section intro Introduction + +The data model consist of the following main components: +* The Job object represents a particular deferred task that happens in the + background. All jobs subclass the Job object and put the main logic in the + function called run(). +* The JobQueue object represents a particular queue of jobs of a certain type. + For example there may be a queue for email jobs and a queue for squid purge + jobs. + +\section jobqueue Job queues + +Each job type has its own queue and is associated to a storage medium. One +queue might save its jobs in redis while another one uses would use a database. + +Storage medium are defined in a queue class. Before using it, you must +define in $wgJobTypeConf a mapping of the job type to a queue class. + +The factory class JobQueueGroup provides helper functions: +- getting the queue for a given job +- route new job insertions to the proper queue + +The following queue classes are available: +* JobQueueDB (stores jobs in the `job` table in a database) +* JobQueueRedis (stores jobs in a redis server) + +All queue classes support some basic operations (though some may be no-ops): +* enqueueing a batch of jobs +* dequeueing a single job +* acknowledging a job is completed +* checking if the queue is empty + +Some queue classes (like JobQueueDB) may dequeue jobs in random order while other +queues might dequeue jobs in exact FIFO order. Callers should thus not assume jobs +are executed in FIFO order. + +Also note that not all queue classes will have the same reliability guarantees. +In-memory queues may lose data when restarted depending on snapshot and journal +settings (including journal fsync() frequency). Some queue types may totally remove +jobs when dequeued while leaving the ack() function as a no-op; if a job is +dequeued by a job runner, which crashes before completion, the job will be +lost. Some jobs, like purging squid caches after a template change, may not +require durable queues, whereas other jobs might be more important. + +\section aggregator Job queue aggregator + +The aggregators are used by nextJobDB.php, which is a script that will return a +random ready queue (on any wiki in the farm) that can be used with runJobs.php. +This can be used in conjunction with any scripts that handle wiki farm job queues. +Note that $wgLocalDatabases defines what wikis are in the wiki farm. + +Since each job type has its own queue, and wiki-farms may have many wikis, +there might be a large number of queues to keep track of. To avoid wasting +large amounts of time polling empty queues, aggregators exists to keep track +of which queues are ready. + +The following queue aggregator classes are available: +* JobQueueAggregatorMemc (uses $wgMemc to track ready queues) +* JobQueueAggregatorRedis (uses a redis server to track ready queues) + +Some aggregators cache data for a few minutes while others may be always up to date. +This can be an important factor for jobs that need a low pickup time (or latency). + +\section jobs Jobs + +Callers should also try to make jobs maintain correctness when executed twice. +This is useful for queues that actually implement ack(), since they may recycle +dequeued but un-acknowledged jobs back into the queue to be attempted again. If +a runner dequeues a job, runs it, but then crashes before calling ack(), the +job may be returned to the queue and run a second time. Jobs like cache purging can +happen several times without any correctness problems. However, a pathological case +would be if a bug causes the problem to systematically keep repeating. For example, +a job may always throw a DB error at the end of run(). This problem is trickier to +solve and more obnoxious for things like email jobs, for example. For such jobs, +it might be useful to use a queue that does not retry jobs. diff --git a/includes/job/RefreshLinksJob.php b/includes/job/RefreshLinksJob.php deleted file mode 100644 index b23951c6..00000000 --- a/includes/job/RefreshLinksJob.php +++ /dev/null @@ -1,202 +0,0 @@ -clear(); - - if ( is_null( $this->title ) ) { - $this->error = "refreshLinks: Invalid title"; - wfProfileOut( __METHOD__ ); - return false; - } - - # Wait for the DB of the current/next slave DB handle to catch up to the master. - # This way, we get the correct page_latest for templates or files that just changed - # milliseconds ago, having triggered this job to begin with. - if ( isset( $this->params['masterPos'] ) ) { - wfGetLB()->waitFor( $this->params['masterPos'] ); - } - - $revision = Revision::newFromTitle( $this->title, false, Revision::READ_NORMAL ); - if ( !$revision ) { - $this->error = 'refreshLinks: Article not found "' . - $this->title->getPrefixedDBkey() . '"'; - wfProfileOut( __METHOD__ ); - return false; // XXX: what if it was just deleted? - } - - self::runForTitleInternal( $this->title, $revision, __METHOD__ ); - - wfProfileOut( __METHOD__ ); - return true; - } - - public static function runForTitleInternal( Title $title, Revision $revision, $fname ) { - global $wgParser, $wgContLang; - - wfProfileIn( $fname . '-parse' ); - $options = ParserOptions::newFromUserAndLang( new User, $wgContLang ); - $parserOutput = $wgParser->parse( - $revision->getText(), $title, $options, true, true, $revision->getId() ); - wfProfileOut( $fname . '-parse' ); - - wfProfileIn( $fname . '-update' ); - $updates = $parserOutput->getSecondaryDataUpdates( $title, false ); - DataUpdate::runUpdates( $updates ); - wfProfileOut( $fname . '-update' ); - } -} - -/** - * Background job to update links for a given title. - * Newer version for high use templates. - * - * @ingroup JobQueue - */ -class RefreshLinksJob2 extends Job { - const MAX_TITLES_RUN = 10; - - function __construct( $title, $params, $id = 0 ) { - parent::__construct( 'refreshLinks2', $title, $params, $id ); - } - - /** - * Run a refreshLinks2 job - * @return boolean success - */ - function run() { - wfProfileIn( __METHOD__ ); - - $linkCache = LinkCache::singleton(); - $linkCache->clear(); - - if ( is_null( $this->title ) ) { - $this->error = "refreshLinks2: Invalid title"; - wfProfileOut( __METHOD__ ); - return false; - } elseif ( !isset( $this->params['start'] ) || !isset( $this->params['end'] ) ) { - $this->error = "refreshLinks2: Invalid params"; - wfProfileOut( __METHOD__ ); - return false; - } - - // Back compat for pre-r94435 jobs - $table = isset( $this->params['table'] ) ? $this->params['table'] : 'templatelinks'; - - // Avoid slave lag when fetching templates - if ( isset( $this->params['masterPos'] ) ) { - $masterPos = $this->params['masterPos']; - } elseif ( wfGetLB()->getServerCount() > 1 ) { - $masterPos = wfGetLB()->getMasterPos(); - } else { - $masterPos = false; - } - - $titles = $this->title->getBacklinkCache()->getLinks( - $table, $this->params['start'], $this->params['end'] ); - - if ( $titles->count() > self::MAX_TITLES_RUN ) { - # We don't want to parse too many pages per job as it can starve other jobs. - # If there are too many pages to parse, break this up into smaller jobs. By passing - # in the master position here we can cut down on the time spent waiting for slaves to - # catch up by the runners handling these jobs since time will have passed between now - # and when they pop these jobs off the queue. - $start = 0; // batch start - $end = 0; // batch end - $bsize = 0; // batch size - $first = true; // first of batch - $jobs = array(); - foreach ( $titles as $title ) { - $start = $first ? $title->getArticleId() : $start; - $end = $title->getArticleId(); - $first = false; - if ( ++$bsize >= self::MAX_TITLES_RUN ) { - $jobs[] = new RefreshLinksJob2( $this->title, array( - 'table' => $table, - 'start' => $start, - 'end' => $end, - 'masterPos' => $masterPos - ) ); - $first = true; - $start = $end = $bsize = 0; - } - } - if ( $bsize > 0 ) { // group remaining pages into a job - $jobs[] = new RefreshLinksJob2( $this->title, array( - 'table' => $table, - 'start' => $start, - 'end' => $end, - 'masterPos' => $masterPos - ) ); - } - Job::batchInsert( $jobs ); - } elseif ( php_sapi_name() != 'cli' ) { - # Not suitable for page load triggered job running! - # Gracefully switch to refreshLinks jobs if this happens. - $jobs = array(); - foreach ( $titles as $title ) { - $jobs[] = new RefreshLinksJob( $title, array( 'masterPos' => $masterPos ) ); - } - Job::batchInsert( $jobs ); - } else { - # Wait for the DB of the current/next slave DB handle to catch up to the master. - # This way, we get the correct page_latest for templates or files that just changed - # milliseconds ago, having triggered this job to begin with. - if ( $masterPos ) { - wfGetLB()->waitFor( $masterPos ); - } - # Re-parse each page that transcludes this page and update their tracking links... - foreach ( $titles as $title ) { - $revision = Revision::newFromTitle( $title, false, Revision::READ_NORMAL ); - if ( !$revision ) { - $this->error = 'refreshLinks: Article not found "' . - $title->getPrefixedDBkey() . '"'; - continue; // skip this page - } - RefreshLinksJob::runForTitleInternal( $title, $revision, __METHOD__ ); - wfWaitForSlaves(); - } - } - - wfProfileOut( __METHOD__ ); - return true; - } -} diff --git a/includes/job/UploadFromUrlJob.php b/includes/job/UploadFromUrlJob.php deleted file mode 100644 index e06f68e4..00000000 --- a/includes/job/UploadFromUrlJob.php +++ /dev/null @@ -1,179 +0,0 @@ -upload = new UploadFromUrl(); - $this->upload->initialize( - $this->title->getText(), - $this->params['url'], - false - ); - $this->user = User::newFromName( $this->params['userName'] ); - - # Fetch the file - $status = $this->upload->fetchFile(); - if ( !$status->isOk() ) { - $this->leaveMessage( $status ); - return true; - } - - # Verify upload - $result = $this->upload->verifyUpload(); - if ( $result['status'] != UploadBase::OK ) { - $status = $this->upload->convertVerifyErrorToStatus( $result ); - $this->leaveMessage( $status ); - return true; - } - - # Check warnings - if ( !$this->params['ignoreWarnings'] ) { - $warnings = $this->upload->checkWarnings(); - if ( $warnings ) { - - # Stash the upload - $key = $this->upload->stashFile(); - - if ( $this->params['leaveMessage'] ) { - $this->user->leaveUserMessage( - wfMessage( 'upload-warning-subj' )->text(), - wfMessage( 'upload-warning-msg', - $key, - $this->params['url'] )->text() - ); - } else { - wfSetupSession( $this->params['sessionId'] ); - $this->storeResultInSession( 'Warning', - 'warnings', $warnings ); - session_write_close(); - } - - return true; - } - } - - # Perform the upload - $status = $this->upload->performUpload( - $this->params['comment'], - $this->params['pageText'], - $this->params['watch'], - $this->user - ); - $this->leaveMessage( $status ); - return true; - - } - - /** - * Leave a message on the user talk page or in the session according to - * $params['leaveMessage']. - * - * @param $status Status - */ - protected function leaveMessage( $status ) { - if ( $this->params['leaveMessage'] ) { - if ( $status->isGood() ) { - $this->user->leaveUserMessage( wfMessage( 'upload-success-subj' )->text(), - wfMessage( 'upload-success-msg', - $this->upload->getTitle()->getText(), - $this->params['url'] - )->text() ); - } else { - $this->user->leaveUserMessage( wfMessage( 'upload-failure-subj' )->text(), - wfMessage( 'upload-failure-msg', - $status->getWikiText(), - $this->params['url'] - )->text() ); - } - } else { - wfSetupSession( $this->params['sessionId'] ); - if ( $status->isOk() ) { - $this->storeResultInSession( 'Success', - 'filename', $this->upload->getLocalFile()->getName() ); - } else { - $this->storeResultInSession( 'Failure', - 'errors', $status->getErrorsArray() ); - } - session_write_close(); - } - } - - /** - * Store a result in the session data. Note that the caller is responsible - * for appropriate session_start and session_write_close calls. - * - * @param $result String: the result (Success|Warning|Failure) - * @param $dataKey String: the key of the extra data - * @param $dataValue Mixed: the extra data itself - */ - protected function storeResultInSession( $result, $dataKey, $dataValue ) { - $session =& self::getSessionData( $this->params['sessionKey'] ); - $session['result'] = $result; - $session[$dataKey] = $dataValue; - } - - /** - * Initialize the session data. Sets the intial result to queued. - */ - public function initializeSessionData() { - $session =& self::getSessionData( $this->params['sessionKey'] ); - $$session['result'] = 'Queued'; - } - - /** - * @param $key - * @return mixed - */ - public static function &getSessionData( $key ) { - if ( !isset( $_SESSION[self::SESSION_KEYNAME][$key] ) ) { - $_SESSION[self::SESSION_KEYNAME][$key] = array(); - } - return $_SESSION[self::SESSION_KEYNAME][$key]; - } -} diff --git a/includes/job/jobs/AssembleUploadChunksJob.php b/includes/job/jobs/AssembleUploadChunksJob.php new file mode 100644 index 00000000..c5dd9eaa --- /dev/null +++ b/includes/job/jobs/AssembleUploadChunksJob.php @@ -0,0 +1,118 @@ +removeDuplicates = true; + } + + public function run() { + $scope = RequestContext::importScopedSession( $this->params['session'] ); + $context = RequestContext::getMain(); + try { + $user = $context->getUser(); + if ( !$user->isLoggedIn() ) { + $this->setLastError( "Could not load the author user from session." ); + return false; + } + + UploadBase::setSessionStatus( + $this->params['filekey'], + array( 'result' => 'Poll', 'stage' => 'assembling', 'status' => Status::newGood() ) + ); + + $upload = new UploadFromChunks( $user ); + $upload->continueChunks( + $this->params['filename'], + $this->params['filekey'], + $context->getRequest() + ); + + // Combine all of the chunks into a local file and upload that to a new stash file + $status = $upload->concatenateChunks(); + if ( !$status->isGood() ) { + UploadBase::setSessionStatus( + $this->params['filekey'], + array( 'result' => 'Failure', 'stage' => 'assembling', 'status' => $status ) + ); + $this->setLastError( $status->getWikiText() ); + return false; + } + + // We have a new filekey for the fully concatenated file + $newFileKey = $upload->getLocalFile()->getFileKey(); + + // Remove the old stash file row and first chunk file + $upload->stash->removeFileNoAuth( $this->params['filekey'] ); + + // Build the image info array while we have the local reference handy + $apiMain = new ApiMain(); // dummy object (XXX) + $imageInfo = $upload->getImageInfo( $apiMain->getResult() ); + + // Cleanup any temporary local file + $upload->cleanupTempFile(); + + // Cache the info so the user doesn't have to wait forever to get the final info + UploadBase::setSessionStatus( + $this->params['filekey'], + array( + 'result' => 'Success', + 'stage' => 'assembling', + 'filekey' => $newFileKey, + 'imageinfo' => $imageInfo, + 'status' => Status::newGood() + ) + ); + } catch ( MWException $e ) { + UploadBase::setSessionStatus( + $this->params['filekey'], + array( + 'result' => 'Failure', + 'stage' => 'assembling', + 'status' => Status::newFatal( 'api-error-stashfailed' ) + ) + ); + $this->setLastError( get_class( $e ) . ": " . $e->getText() ); + return false; + } + return true; + } + + public function getDeduplicationInfo() { + $info = parent::getDeduplicationInfo(); + if ( is_array( $info['params'] ) ) { + $info['params'] = array( 'filekey' => $info['params']['filekey'] ); + } + return $info; + } + + public function allowRetries() { + return false; + } +} diff --git a/includes/job/jobs/DoubleRedirectJob.php b/includes/job/jobs/DoubleRedirectJob.php new file mode 100644 index 00000000..05abeeef --- /dev/null +++ b/includes/job/jobs/DoubleRedirectJob.php @@ -0,0 +1,218 @@ +" + * @param $redirTitle Title: the title which has changed, redirects pointing to this title are fixed + * @param bool $destTitle Not used + */ + public static function fixRedirects( $reason, $redirTitle, $destTitle = false ) { + # Need to use the master to get the redirect table updated in the same transaction + $dbw = wfGetDB( DB_MASTER ); + $res = $dbw->select( + array( 'redirect', 'page' ), + array( 'page_namespace', 'page_title' ), + array( + 'page_id = rd_from', + 'rd_namespace' => $redirTitle->getNamespace(), + 'rd_title' => $redirTitle->getDBkey() + ), __METHOD__ ); + if ( !$res->numRows() ) { + return; + } + $jobs = array(); + foreach ( $res as $row ) { + $title = Title::makeTitle( $row->page_namespace, $row->page_title ); + if ( !$title ) { + continue; + } + + $jobs[] = new self( $title, array( + 'reason' => $reason, + 'redirTitle' => $redirTitle->getPrefixedDBkey() ) ); + # Avoid excessive memory usage + if ( count( $jobs ) > 10000 ) { + JobQueueGroup::singleton()->push( $jobs ); + $jobs = array(); + } + } + JobQueueGroup::singleton()->push( $jobs ); + } + + function __construct( $title, $params = false, $id = 0 ) { + parent::__construct( 'fixDoubleRedirect', $title, $params, $id ); + $this->reason = $params['reason']; + $this->redirTitle = Title::newFromText( $params['redirTitle'] ); + } + + /** + * @return bool + */ + function run() { + if ( !$this->redirTitle ) { + $this->setLastError( 'Invalid title' ); + return false; + } + + $targetRev = Revision::newFromTitle( $this->title, false, Revision::READ_LATEST ); + if ( !$targetRev ) { + wfDebug( __METHOD__.": target redirect already deleted, ignoring\n" ); + return true; + } + $content = $targetRev->getContent(); + $currentDest = $content ? $content->getRedirectTarget() : null; + if ( !$currentDest || !$currentDest->equals( $this->redirTitle ) ) { + wfDebug( __METHOD__.": Redirect has changed since the job was queued\n" ); + return true; + } + + # Check for a suppression tag (used e.g. in periodically archived discussions) + $mw = MagicWord::get( 'staticredirect' ); + if ( $content->matchMagicWord( $mw ) ) { + wfDebug( __METHOD__.": skipping: suppressed with __STATICREDIRECT__\n" ); + return true; + } + + # Find the current final destination + $newTitle = self::getFinalDestination( $this->redirTitle ); + if ( !$newTitle ) { + wfDebug( __METHOD__.": skipping: single redirect, circular redirect or invalid redirect destination\n" ); + return true; + } + if ( $newTitle->equals( $this->redirTitle ) ) { + # The redirect is already right, no need to change it + # This can happen if the page was moved back (say after vandalism) + wfDebug( __METHOD__.": skipping, already good\n" ); + } + + # Preserve fragment (bug 14904) + $newTitle = Title::makeTitle( $newTitle->getNamespace(), $newTitle->getDBkey(), + $currentDest->getFragment(), $newTitle->getInterwiki() ); + + # Fix the text + $newContent = $content->updateRedirect( $newTitle ); + + if ( $newContent->equals( $content ) ) { + $this->setLastError( 'Content unchanged???' ); + return false; + } + + $user = $this->getUser(); + if ( !$user ) { + $this->setLastError( 'Invalid user' ); + return false; + } + + # Save it + global $wgUser; + $oldUser = $wgUser; + $wgUser = $user; + $article = WikiPage::factory( $this->title ); + $reason = wfMessage( 'double-redirect-fixed-' . $this->reason, + $this->redirTitle->getPrefixedText(), $newTitle->getPrefixedText() + )->inContentLanguage()->text(); + $article->doEditContent( $newContent, $reason, EDIT_UPDATE | EDIT_SUPPRESS_RC, false, $user ); + $wgUser = $oldUser; + + return true; + } + + /** + * Get the final destination of a redirect + * + * @param $title Title + * + * @return bool if the specified title is not a redirect, or if it is a circular redirect + */ + public static function getFinalDestination( $title ) { + $dbw = wfGetDB( DB_MASTER ); + + $seenTitles = array(); # Circular redirect check + $dest = false; + + while ( true ) { + $titleText = $title->getPrefixedDBkey(); + if ( isset( $seenTitles[$titleText] ) ) { + wfDebug( __METHOD__, "Circular redirect detected, aborting\n" ); + return false; + } + $seenTitles[$titleText] = true; + + if ( $title->getInterwiki() ) { + // If the target is interwiki, we have to break early (bug 40352). + // Otherwise it will look up a row in the local page table + // with the namespace/page of the interwiki target which can cause + // unexpected results (e.g. X -> foo:Bar -> Bar -> .. ) + break; + } + + $row = $dbw->selectRow( + array( 'redirect', 'page' ), + array( 'rd_namespace', 'rd_title', 'rd_interwiki' ), + array( + 'rd_from=page_id', + 'page_namespace' => $title->getNamespace(), + 'page_title' => $title->getDBkey() + ), __METHOD__ ); + if ( !$row ) { + # No redirect from here, chain terminates + break; + } else { + $dest = $title = Title::makeTitle( $row->rd_namespace, $row->rd_title, '', $row->rd_interwiki ); + } + } + return $dest; + } + + /** + * Get a user object for doing edits, from a request-lifetime cache + * False will be returned if the user name specified in the + * 'double-redirect-fixer' message is invalid. + * + * @return User|bool + */ + function getUser() { + if ( !self::$user ) { + self::$user = User::newFromName( wfMessage( 'double-redirect-fixer' )->inContentLanguage()->text() ); + # User::newFromName() can return false on a badly configured wiki. + if ( self::$user && !self::$user->isLoggedIn() ) { + self::$user->addToDatabase(); + } + } + return self::$user; + } +} diff --git a/includes/job/jobs/DuplicateJob.php b/includes/job/jobs/DuplicateJob.php new file mode 100644 index 00000000..524983b8 --- /dev/null +++ b/includes/job/jobs/DuplicateJob.php @@ -0,0 +1,59 @@ +getTitle(), $job->getParams(), $job->getId() ); + $djob->command = $job->getType(); + $djob->params = is_array( $djob->params ) ? $djob->params : array(); + $djob->params = array( 'isDuplicate' => true ) + $djob->params; + $djob->metadata = $job->metadata; + return $djob; + } + + public function run() { + return true; + } +} diff --git a/includes/job/jobs/EmaillingJob.php b/includes/job/jobs/EmaillingJob.php new file mode 100644 index 00000000..9fbf3124 --- /dev/null +++ b/includes/job/jobs/EmaillingJob.php @@ -0,0 +1,47 @@ +params['to'], + $this->params['from'], + $this->params['subj'], + $this->params['body'], + $this->params['replyto'] + ); + + return $status->isOK(); + } + +} diff --git a/includes/job/jobs/EnotifNotifyJob.php b/includes/job/jobs/EnotifNotifyJob.php new file mode 100644 index 00000000..2be05b63 --- /dev/null +++ b/includes/job/jobs/EnotifNotifyJob.php @@ -0,0 +1,58 @@ +params['editorID'] ) && $this->params['editorID'] ) { + $editor = User::newFromId( $this->params['editorID'] ); + // B/C, only the name might be given. + } else { + # FIXME: newFromName could return false on a badly configured wiki. + $editor = User::newFromName( $this->params['editor'], false ); + } + $enotif->actuallyNotifyOnPageChange( + $editor, + $this->title, + $this->params['timestamp'], + $this->params['summary'], + $this->params['minorEdit'], + $this->params['oldid'], + $this->params['watchers'], + $this->params['pageStatus'] + ); + return true; + } + +} diff --git a/includes/job/jobs/HTMLCacheUpdateJob.php b/includes/job/jobs/HTMLCacheUpdateJob.php new file mode 100644 index 00000000..818c6abf --- /dev/null +++ b/includes/job/jobs/HTMLCacheUpdateJob.php @@ -0,0 +1,254 @@ +rowsPerJob = $wgUpdateRowsPerJob; + $this->rowsPerQuery = $wgUpdateRowsPerQuery; + $this->blCache = $title->getBacklinkCache(); + } + + public function run() { + if ( isset( $this->params['start'] ) && isset( $this->params['end'] ) ) { + # This is hit when a job is actually performed + return $this->doPartialUpdate(); + } else { + # This is hit when the jobs have to be inserted + return $this->doFullUpdate(); + } + } + + /** + * Update all of the backlinks + */ + protected function doFullUpdate() { + # Get an estimate of the number of rows from the BacklinkCache + $numRows = $this->blCache->getNumLinks( $this->params['table'] ); + if ( $numRows > $this->rowsPerJob * 2 ) { + # Do fast cached partition + $this->insertPartitionJobs(); + } else { + # Get the links from the DB + $titleArray = $this->blCache->getLinks( $this->params['table'] ); + # Check if the row count estimate was correct + if ( $titleArray->count() > $this->rowsPerJob * 2 ) { + # Not correct, do accurate partition + wfDebug( __METHOD__.": row count estimate was incorrect, repartitioning\n" ); + $this->insertJobsFromTitles( $titleArray ); + } else { + $this->invalidateTitles( $titleArray ); // just do the query + } + } + return true; + } + + /** + * Update some of the backlinks, defined by a page ID range + */ + protected function doPartialUpdate() { + $titleArray = $this->blCache->getLinks( + $this->params['table'], $this->params['start'], $this->params['end'] ); + if ( $titleArray->count() <= $this->rowsPerJob * 2 ) { + # This partition is small enough, do the update + $this->invalidateTitles( $titleArray ); + } else { + # Partitioning was excessively inaccurate. Divide the job further. + # This can occur when a large number of links are added in a short + # period of time, say by updating a heavily-used template. + $this->insertJobsFromTitles( $titleArray ); + } + return true; + } + + /** + * Partition the current range given by $this->params['start'] and $this->params['end'], + * using a pre-calculated title array which gives the links in that range. + * Queue the resulting jobs. + * + * @param $titleArray array + * @param $rootJobParams array + * @return void + */ + protected function insertJobsFromTitles( $titleArray, $rootJobParams = array() ) { + // Carry over any "root job" information + $rootJobParams = $this->getRootJobParams(); + # We make subpartitions in the sense that the start of the first job + # will be the start of the parent partition, and the end of the last + # job will be the end of the parent partition. + $jobs = array(); + $start = $this->params['start']; # start of the current job + $numTitles = 0; + foreach ( $titleArray as $title ) { + $id = $title->getArticleID(); + # $numTitles is now the number of titles in the current job not + # including the current ID + if ( $numTitles >= $this->rowsPerJob ) { + # Add a job up to but not including the current ID + $jobs[] = new HTMLCacheUpdateJob( $this->title, + array( + 'table' => $this->params['table'], + 'start' => $start, + 'end' => $id - 1 + ) + $rootJobParams // carry over information for de-duplication + ); + $start = $id; + $numTitles = 0; + } + $numTitles++; + } + # Last job + $jobs[] = new HTMLCacheUpdateJob( $this->title, + array( + 'table' => $this->params['table'], + 'start' => $start, + 'end' => $this->params['end'] + ) + $rootJobParams // carry over information for de-duplication + ); + wfDebug( __METHOD__.": repartitioning into " . count( $jobs ) . " jobs\n" ); + + if ( count( $jobs ) < 2 ) { + # I don't think this is possible at present, but handling this case + # makes the code a bit more robust against future code updates and + # avoids a potential infinite loop of repartitioning + wfDebug( __METHOD__.": repartitioning failed!\n" ); + $this->invalidateTitles( $titleArray ); + } else { + JobQueueGroup::singleton()->push( $jobs ); + } + } + + /** + * @param $rootJobParams array + * @return void + */ + protected function insertPartitionJobs( $rootJobParams = array() ) { + // Carry over any "root job" information + $rootJobParams = $this->getRootJobParams(); + + $batches = $this->blCache->partition( $this->params['table'], $this->rowsPerJob ); + if ( !count( $batches ) ) { + return; // no jobs to insert + } + + $jobs = array(); + foreach ( $batches as $batch ) { + list( $start, $end ) = $batch; + $jobs[] = new HTMLCacheUpdateJob( $this->title, + array( + 'table' => $this->params['table'], + 'start' => $start, + 'end' => $end, + ) + $rootJobParams // carry over information for de-duplication + ); + } + + JobQueueGroup::singleton()->push( $jobs ); + } + + /** + * Invalidate an array (or iterator) of Title objects, right now + * @param $titleArray array + */ + protected function invalidateTitles( $titleArray ) { + global $wgUseFileCache, $wgUseSquid; + + $dbw = wfGetDB( DB_MASTER ); + $timestamp = $dbw->timestamp(); + + # Get all IDs in this query into an array + $ids = array(); + foreach ( $titleArray as $title ) { + $ids[] = $title->getArticleID(); + } + + if ( !$ids ) { + return; + } + + # Don't invalidated pages that were already invalidated + $touchedCond = isset( $this->params['rootJobTimestamp'] ) + ? array( "page_touched < " . + $dbw->addQuotes( $dbw->timestamp( $this->params['rootJobTimestamp'] ) ) ) + : array(); + + # Update page_touched + $batches = array_chunk( $ids, $this->rowsPerQuery ); + foreach ( $batches as $batch ) { + $dbw->update( 'page', + array( 'page_touched' => $timestamp ), + array( 'page_id' => $batch ) + $touchedCond, + __METHOD__ + ); + } + + # Update squid + if ( $wgUseSquid ) { + $u = SquidUpdate::newFromTitles( $titleArray ); + $u->doUpdate(); + } + + # Update file cache + if ( $wgUseFileCache ) { + foreach ( $titleArray as $title ) { + HTMLFileCache::clearFileCache( $title ); + } + } + } +} diff --git a/includes/job/jobs/NullJob.php b/includes/job/jobs/NullJob.php new file mode 100644 index 00000000..d282a8e6 --- /dev/null +++ b/includes/job/jobs/NullJob.php @@ -0,0 +1,60 @@ +params['lives'] ) ) { + $this->params['lives'] = 1; + } + if ( !isset( $this->params['usleep'] ) ) { + $this->params['usleep'] = 0; + } + $this->removeDuplicates = !empty( $this->params['removeDuplicates'] ); + } + + public function run() { + if ( $this->params['usleep'] > 0 ) { + usleep( $this->params['usleep'] ); + } + if ( $this->params['lives'] > 1 ) { + $params = $this->params; + $params['lives']--; + $job = new self( $this->title, $params ); + JobQueueGroup::singleton()->push( $job ); + } + return true; + } +} diff --git a/includes/job/jobs/PublishStashedFileJob.php b/includes/job/jobs/PublishStashedFileJob.php new file mode 100644 index 00000000..d3feda28 --- /dev/null +++ b/includes/job/jobs/PublishStashedFileJob.php @@ -0,0 +1,130 @@ +removeDuplicates = true; + } + + public function run() { + $scope = RequestContext::importScopedSession( $this->params['session'] ); + $context = RequestContext::getMain(); + try { + $user = $context->getUser(); + if ( !$user->isLoggedIn() ) { + $this->setLastError( "Could not load the author user from session." ); + return false; + } + + UploadBase::setSessionStatus( + $this->params['filekey'], + array( 'result' => 'Poll', 'stage' => 'publish', 'status' => Status::newGood() ) + ); + + $upload = new UploadFromStash( $user ); + // @TODO: initialize() causes a GET, ideally we could frontload the antivirus + // checks and anything else to the stash stage (which includes concatenation and + // the local file is thus already there). That way, instead of GET+PUT, there could + // just be a COPY operation from the stash to the public zone. + $upload->initialize( $this->params['filekey'], $this->params['filename'] ); + + // Check if the local file checks out (this is generally a no-op) + $verification = $upload->verifyUpload(); + if ( $verification['status'] !== UploadBase::OK ) { + $status = Status::newFatal( 'verification-error' ); + $status->value = array( 'verification' => $verification ); + UploadBase::setSessionStatus( + $this->params['filekey'], + array( 'result' => 'Failure', 'stage' => 'publish', 'status' => $status ) + ); + $this->setLastError( "Could not verify upload." ); + return false; + } + + // Upload the stashed file to a permanent location + $status = $upload->performUpload( + $this->params['comment'], + $this->params['text'], + $this->params['watch'], + $user + ); + if ( !$status->isGood() ) { + UploadBase::setSessionStatus( + $this->params['filekey'], + array( 'result' => 'Failure', 'stage' => 'publish', 'status' => $status ) + ); + $this->setLastError( $status->getWikiText() ); + return false; + } + + // Build the image info array while we have the local reference handy + $apiMain = new ApiMain(); // dummy object (XXX) + $imageInfo = $upload->getImageInfo( $apiMain->getResult() ); + + // Cleanup any temporary local file + $upload->cleanupTempFile(); + + // Cache the info so the user doesn't have to wait forever to get the final info + UploadBase::setSessionStatus( + $this->params['filekey'], + array( + 'result' => 'Success', + 'stage' => 'publish', + 'filename' => $upload->getLocalFile()->getName(), + 'imageinfo' => $imageInfo, + 'status' => Status::newGood() + ) + ); + } catch ( MWException $e ) { + UploadBase::setSessionStatus( + $this->params['filekey'], + array( + 'result' => 'Failure', + 'stage' => 'publish', + 'status' => Status::newFatal( 'api-error-publishfailed' ) + ) + ); + $this->setLastError( get_class( $e ) . ": " . $e->getText() ); + return false; + } + return true; + } + + public function getDeduplicationInfo() { + $info = parent::getDeduplicationInfo(); + if ( is_array( $info['params'] ) ) { + $info['params'] = array( 'filekey' => $info['params']['filekey'] ); + } + return $info; + } + + public function allowRetries() { + return false; + } +} diff --git a/includes/job/jobs/RefreshLinksJob.php b/includes/job/jobs/RefreshLinksJob.php new file mode 100644 index 00000000..9dbe8278 --- /dev/null +++ b/includes/job/jobs/RefreshLinksJob.php @@ -0,0 +1,226 @@ +removeDuplicates = true; // job is expensive + } + + /** + * Run a refreshLinks job + * @return boolean success + */ + function run() { + wfProfileIn( __METHOD__ ); + + $linkCache = LinkCache::singleton(); + $linkCache->clear(); + + if ( is_null( $this->title ) ) { + $this->error = "refreshLinks: Invalid title"; + wfProfileOut( __METHOD__ ); + return false; + } + + # Wait for the DB of the current/next slave DB handle to catch up to the master. + # This way, we get the correct page_latest for templates or files that just changed + # milliseconds ago, having triggered this job to begin with. + if ( isset( $this->params['masterPos'] ) ) { + wfGetLB()->waitFor( $this->params['masterPos'] ); + } + + $revision = Revision::newFromTitle( $this->title, false, Revision::READ_NORMAL ); + if ( !$revision ) { + $this->error = 'refreshLinks: Article not found "' . + $this->title->getPrefixedDBkey() . '"'; + wfProfileOut( __METHOD__ ); + return false; // XXX: what if it was just deleted? + } + + self::runForTitleInternal( $this->title, $revision, __METHOD__ ); + + wfProfileOut( __METHOD__ ); + return true; + } + + /** + * @return Array + */ + public function getDeduplicationInfo() { + $info = parent::getDeduplicationInfo(); + // Don't let highly unique "masterPos" values ruin duplicate detection + if ( is_array( $info['params'] ) ) { + unset( $info['params']['masterPos'] ); + } + return $info; + } + + /** + * @param $title Title + * @param $revision Revision + * @param $fname string + * @return void + */ + public static function runForTitleInternal( Title $title, Revision $revision, $fname ) { + wfProfileIn( $fname ); + $content = $revision->getContent( Revision::RAW ); + + if ( !$content ) { + // if there is no content, pretend the content is empty + $content = $revision->getContentHandler()->makeEmptyContent(); + } + + // Revision ID must be passed to the parser output to get revision variables correct + $parserOutput = $content->getParserOutput( $title, $revision->getId(), null, false ); + + $updates = $content->getSecondaryDataUpdates( $title, null, false, $parserOutput ); + DataUpdate::runUpdates( $updates ); + wfProfileOut( $fname ); + } +} + +/** + * Background job to update links for a given title. + * Newer version for high use templates. + * + * @ingroup JobQueue + */ +class RefreshLinksJob2 extends Job { + function __construct( $title, $params, $id = 0 ) { + parent::__construct( 'refreshLinks2', $title, $params, $id ); + } + + /** + * Run a refreshLinks2 job + * @return boolean success + */ + function run() { + global $wgUpdateRowsPerJob; + + wfProfileIn( __METHOD__ ); + + $linkCache = LinkCache::singleton(); + $linkCache->clear(); + + if ( is_null( $this->title ) ) { + $this->error = "refreshLinks2: Invalid title"; + wfProfileOut( __METHOD__ ); + return false; + } + + // Back compat for pre-r94435 jobs + $table = isset( $this->params['table'] ) ? $this->params['table'] : 'templatelinks'; + + // Avoid slave lag when fetching templates. + // When the outermost job is run, we know that the caller that enqueued it must have + // committed the relevant changes to the DB by now. At that point, record the master + // position and pass it along as the job recursively breaks into smaller range jobs. + // Hopefully, when leaf jobs are popped, the slaves will have reached that position. + if ( isset( $this->params['masterPos'] ) ) { + $masterPos = $this->params['masterPos']; + } elseif ( wfGetLB()->getServerCount() > 1 ) { + $masterPos = wfGetLB()->getMasterPos(); + } else { + $masterPos = false; + } + + $tbc = $this->title->getBacklinkCache(); + + $jobs = array(); // jobs to insert + if ( isset( $this->params['start'] ) && isset( $this->params['end'] ) ) { + # This is a partition job to trigger the insertion of leaf jobs... + $jobs = array_merge( $jobs, $this->getSingleTitleJobs( $table, $masterPos ) ); + } else { + # This is a base job to trigger the insertion of partitioned jobs... + if ( $tbc->getNumLinks( $table ) <= $wgUpdateRowsPerJob ) { + # Just directly insert the single per-title jobs + $jobs = array_merge( $jobs, $this->getSingleTitleJobs( $table, $masterPos ) ); + } else { + # Insert the partition jobs to make per-title jobs + foreach ( $tbc->partition( $table, $wgUpdateRowsPerJob ) as $batch ) { + list( $start, $end ) = $batch; + $jobs[] = new RefreshLinksJob2( $this->title, + array( + 'table' => $table, + 'start' => $start, + 'end' => $end, + 'masterPos' => $masterPos, + ) + $this->getRootJobParams() // carry over information for de-duplication + ); + } + } + } + + if ( count( $jobs ) ) { + JobQueueGroup::singleton()->push( $jobs ); + } + + wfProfileOut( __METHOD__ ); + return true; + } + + /** + * @param $table string + * @param $masterPos mixed + * @return Array + */ + protected function getSingleTitleJobs( $table, $masterPos ) { + # The "start"/"end" fields are not set for the base jobs + $start = isset( $this->params['start'] ) ? $this->params['start'] : false; + $end = isset( $this->params['end'] ) ? $this->params['end'] : false; + $titles = $this->title->getBacklinkCache()->getLinks( $table, $start, $end ); + # Convert into single page refresh links jobs. + # This handles well when in sapi mode and is useful in any case for job + # de-duplication. If many pages use template A, and that template itself + # uses template B, then an edit to both will create many duplicate jobs. + # Roughly speaking, for each page, one of the "RefreshLinksJob" jobs will + # get run first, and when it does, it will remove the duplicates. Of course, + # one page could have its job popped when the other page's job is still + # buried within the logic of a refreshLinks2 job. + $jobs = array(); + foreach ( $titles as $title ) { + $jobs[] = new RefreshLinksJob( $title, + array( 'masterPos' => $masterPos ) + $this->getRootJobParams() + ); // carry over information for de-duplication + } + return $jobs; + } + + /** + * @return Array + */ + public function getDeduplicationInfo() { + $info = parent::getDeduplicationInfo(); + // Don't let highly unique "masterPos" values ruin duplicate detection + if ( is_array( $info['params'] ) ) { + unset( $info['params']['masterPos'] ); + } + return $info; + } +} diff --git a/includes/job/jobs/UploadFromUrlJob.php b/includes/job/jobs/UploadFromUrlJob.php new file mode 100644 index 00000000..87549140 --- /dev/null +++ b/includes/job/jobs/UploadFromUrlJob.php @@ -0,0 +1,179 @@ +upload = new UploadFromUrl(); + $this->upload->initialize( + $this->title->getText(), + $this->params['url'], + false + ); + $this->user = User::newFromName( $this->params['userName'] ); + + # Fetch the file + $status = $this->upload->fetchFile(); + if ( !$status->isOk() ) { + $this->leaveMessage( $status ); + return true; + } + + # Verify upload + $result = $this->upload->verifyUpload(); + if ( $result['status'] != UploadBase::OK ) { + $status = $this->upload->convertVerifyErrorToStatus( $result ); + $this->leaveMessage( $status ); + return true; + } + + # Check warnings + if ( !$this->params['ignoreWarnings'] ) { + $warnings = $this->upload->checkWarnings(); + if ( $warnings ) { + + # Stash the upload + $key = $this->upload->stashFile(); + + if ( $this->params['leaveMessage'] ) { + $this->user->leaveUserMessage( + wfMessage( 'upload-warning-subj' )->text(), + wfMessage( 'upload-warning-msg', + $key, + $this->params['url'] )->text() + ); + } else { + wfSetupSession( $this->params['sessionId'] ); + $this->storeResultInSession( 'Warning', + 'warnings', $warnings ); + session_write_close(); + } + + return true; + } + } + + # Perform the upload + $status = $this->upload->performUpload( + $this->params['comment'], + $this->params['pageText'], + $this->params['watch'], + $this->user + ); + $this->leaveMessage( $status ); + return true; + + } + + /** + * Leave a message on the user talk page or in the session according to + * $params['leaveMessage']. + * + * @param $status Status + */ + protected function leaveMessage( $status ) { + if ( $this->params['leaveMessage'] ) { + if ( $status->isGood() ) { + $this->user->leaveUserMessage( wfMessage( 'upload-success-subj' )->text(), + wfMessage( 'upload-success-msg', + $this->upload->getTitle()->getText(), + $this->params['url'] + )->text() ); + } else { + $this->user->leaveUserMessage( wfMessage( 'upload-failure-subj' )->text(), + wfMessage( 'upload-failure-msg', + $status->getWikiText(), + $this->params['url'] + )->text() ); + } + } else { + wfSetupSession( $this->params['sessionId'] ); + if ( $status->isOk() ) { + $this->storeResultInSession( 'Success', + 'filename', $this->upload->getLocalFile()->getName() ); + } else { + $this->storeResultInSession( 'Failure', + 'errors', $status->getErrorsArray() ); + } + session_write_close(); + } + } + + /** + * Store a result in the session data. Note that the caller is responsible + * for appropriate session_start and session_write_close calls. + * + * @param string $result the result (Success|Warning|Failure) + * @param string $dataKey the key of the extra data + * @param $dataValue Mixed: the extra data itself + */ + protected function storeResultInSession( $result, $dataKey, $dataValue ) { + $session =& self::getSessionData( $this->params['sessionKey'] ); + $session['result'] = $result; + $session[$dataKey] = $dataValue; + } + + /** + * Initialize the session data. Sets the intial result to queued. + */ + public function initializeSessionData() { + $session =& self::getSessionData( $this->params['sessionKey'] ); + $$session['result'] = 'Queued'; + } + + /** + * @param $key + * @return mixed + */ + public static function &getSessionData( $key ) { + if ( !isset( $_SESSION[self::SESSION_KEYNAME][$key] ) ) { + $_SESSION[self::SESSION_KEYNAME][$key] = array(); + } + return $_SESSION[self::SESSION_KEYNAME][$key]; + } +} -- cgit v1.2.2