summaryrefslogtreecommitdiff
path: root/includes/jobqueue/JobRunner.php
diff options
context:
space:
mode:
Diffstat (limited to 'includes/jobqueue/JobRunner.php')
-rw-r--r--includes/jobqueue/JobRunner.php86
1 files changed, 71 insertions, 15 deletions
diff --git a/includes/jobqueue/JobRunner.php b/includes/jobqueue/JobRunner.php
index 8cccedaf..b8c5d6cf 100644
--- a/includes/jobqueue/JobRunner.php
+++ b/includes/jobqueue/JobRunner.php
@@ -21,13 +21,17 @@
* @ingroup JobQueue
*/
+use MediaWiki\Logger\LoggerFactory;
+use Psr\Log\LoggerAwareInterface;
+use Psr\Log\LoggerInterface;
+
/**
* Job queue runner utility methods
*
* @ingroup JobQueue
* @since 1.24
*/
-class JobRunner {
+class JobRunner implements LoggerAwareInterface {
/** @var callable|null Debug output handler */
protected $debug;
@@ -39,6 +43,28 @@ class JobRunner {
}
/**
+ * @var LoggerInterface $logger
+ */
+ protected $logger;
+
+ /**
+ * @param LoggerInterface $logger
+ */
+ public function setLogger( LoggerInterface $logger ) {
+ $this->logger = $logger;
+ }
+
+ /**
+ * @param LoggerInterface $logger
+ */
+ public function __construct( LoggerInterface $logger = null ) {
+ if ( $logger === null ) {
+ $logger = LoggerFactory::getInstance( 'runJobs' );
+ }
+ $this->setLogger( $logger );
+ }
+
+ /**
* Run jobs of the specified number/type for the specified time
*
* The response map has a 'job' field that lists status of each job, including:
@@ -62,6 +88,8 @@ class JobRunner {
* @return array Summary response that can easily be JSON serialized
*/
public function run( array $options ) {
+ global $wgJobClasses;
+
$response = array( 'jobs' => array(), 'reached' => 'none-ready' );
$type = isset( $options['type'] ) ? $options['type'] : false;
@@ -69,11 +97,31 @@ class JobRunner {
$maxTime = isset( $options['maxTime'] ) ? $options['maxTime'] : false;
$noThrottle = isset( $options['throttle'] ) && !$options['throttle'];
+ if ( $type !== false && !isset( $wgJobClasses[$type] ) ) {
+ $response['reached'] = 'none-possible';
+ return $response;
+ }
+
$group = JobQueueGroup::singleton();
// Handle any required periodic queue maintenance
$count = $group->executeReadyPeriodicTasks();
if ( $count > 0 ) {
- $this->runJobsLog( "Executed $count periodic queue task(s)." );
+ $msg = "Executed $count periodic queue task(s).";
+ $this->logger->debug( $msg );
+ $this->debugCallback( $msg );
+ }
+
+ // Bail out if in read-only mode
+ if ( wfReadOnly() ) {
+ $response['reached'] = 'read-only';
+ return $response;
+ }
+
+ // Bail out if there is too much DB lag
+ list( , $maxLag ) = wfGetLBFactory()->getMainLB( wfWikiID() )->getMaxLag();
+ if ( $maxLag >= 5 ) {
+ $response['reached'] = 'slave-lag-limit';
+ return $response;
}
// Flush any pending DB writes for sanity
@@ -87,8 +135,10 @@ class JobRunner {
$jobsRun = 0;
$timeMsTotal = 0;
$flags = JobQueueGroup::USE_CACHE;
+ $checkPeriod = 5.0; // seconds
+ $checkPhase = mt_rand( 0, 1000 * $checkPeriod ) / 1000; // avoid stampedes
$startTime = microtime( true ); // time since jobs started running
- $lastTime = microtime( true ); // time since last slave check
+ $lastTime = microtime( true ) - $checkPhase; // time since last slave check
do {
// Sync the persistent backoffs with concurrent runners
$backoffs = $this->syncBackoffDeltas( $backoffs, $backoffDeltas, $wait );
@@ -117,24 +167,24 @@ class JobRunner {
$backoffs = $this->syncBackoffDeltas( $backoffs, $backoffDeltas, $wait );
}
- $this->runJobsLog( $job->toString() . " STARTING" );
+ $msg = $job->toString() . " STARTING";
+ $this->logger->info( $msg );
+ $this->debugCallback( $msg );
// Run the job...
- wfProfileIn( __METHOD__ . '-' . get_class( $job ) );
$jobStartTime = microtime( true );
try {
++$jobsRun;
$status = $job->run();
$error = $job->getLastError();
wfGetLBFactory()->commitMasterChanges();
- } catch ( MWException $e ) {
+ } catch ( Exception $e ) {
MWExceptionHandler::rollbackMasterChangesAndLog( $e );
$status = false;
$error = get_class( $e ) . ': ' . $e->getMessage();
MWExceptionHandler::logException( $e );
}
$timeMs = intval( ( microtime( true ) - $jobStartTime ) * 1000 );
- wfProfileOut( __METHOD__ . '-' . get_class( $job ) );
$timeMsTotal += $timeMs;
// Mark the job as done on success or when the job cannot be retried
@@ -151,9 +201,13 @@ class JobRunner {
}
if ( $status === false ) {
- $this->runJobsLog( $job->toString() . " t=$timeMs error={$error}" );
+ $msg = $job->toString() . " t=$timeMs error={$error}";
+ $this->logger->error( $msg );
+ $this->debugCallback( $msg );
} else {
- $this->runJobsLog( $job->toString() . " t=$timeMs good" );
+ $msg = $job->toString() . " t=$timeMs good";
+ $this->logger->info( $msg );
+ $this->debugCallback( $msg );
}
$response['jobs'][] = array(
@@ -172,10 +226,15 @@ class JobRunner {
break;
}
- // Don't let any of the main DB slaves get backed up
+ // Don't let any of the main DB slaves get backed up.
+ // This only waits for so long before exiting and letting
+ // other wikis in the farm (on different masters) get a chance.
$timePassed = microtime( true ) - $lastTime;
if ( $timePassed >= 5 || $timePassed < 0 ) {
- wfWaitForSlaves( $lastTime );
+ if ( !wfWaitForSlaves( $lastTime, false, '*', 5 ) ) {
+ $response['reached'] = 'slave-lag-limit';
+ break;
+ }
$lastTime = microtime( true );
}
// Don't let any queue slaves/backups fall behind
@@ -239,7 +298,6 @@ class JobRunner {
* @return array Map of (job type => backoff expiry timestamp)
*/
private function loadBackoffs( array $backoffs, $mode = 'wait' ) {
- $section = new ProfileSection( __METHOD__ );
$file = wfTempDir() . '/mw-runJobs-backoffs.json';
if ( is_file( $file ) ) {
@@ -278,7 +336,6 @@ class JobRunner {
* @return array The new backoffs account for $backoffs and the latest file data
*/
private function syncBackoffDeltas( array $backoffs, array &$deltas, $mode = 'wait' ) {
- $section = new ProfileSection( __METHOD__ );
if ( !$deltas ) {
return $this->loadBackoffs( $backoffs, $mode );
@@ -341,10 +398,9 @@ class JobRunner {
* Log the job message
* @param string $msg The message to log
*/
- private function runJobsLog( $msg ) {
+ private function debugCallback( $msg ) {
if ( $this->debug ) {
call_user_func_array( $this->debug, array( wfTimestamp( TS_DB ) . " $msg\n" ) );
}
- wfDebugLog( 'runJobs', $msg );
}
}