summaryrefslogtreecommitdiff
path: root/maintenance/nextJobDB.php
diff options
context:
space:
mode:
Diffstat (limited to 'maintenance/nextJobDB.php')
-rw-r--r--maintenance/nextJobDB.php148
1 files changed, 53 insertions, 95 deletions
diff --git a/maintenance/nextJobDB.php b/maintenance/nextJobDB.php
index e66e981b..1be5146e 100644
--- a/maintenance/nextJobDB.php
+++ b/maintenance/nextJobDB.php
@@ -18,7 +18,6 @@
* http://www.gnu.org/copyleft/gpl.html
*
* @file
- * @todo Make this work on PostgreSQL and maybe other database servers
* @ingroup Maintenance
*/
@@ -33,127 +32,86 @@ class nextJobDB extends Maintenance {
public function __construct() {
parent::__construct();
$this->mDescription = "Pick a database that has pending jobs";
- $this->addOption( 'type', "The type of job to search for", false, true );
+ $this->addOption( 'type', "Search by job type", false, true );
+ $this->addOption( 'types', "Space separated list of job types to search for", false, true );
}
public function execute() {
- global $wgMemc;
- $type = $this->getOption( 'type', false );
+ global $wgJobTypesExcludedFromDefaultQueue;
- $memcKey = 'jobqueue:dbs:v2';
- $pendingDBs = $wgMemc->get( $memcKey );
-
- // If the cache entry wasn't present, or in 1% of cases otherwise,
- // regenerate the cache.
- if ( !$pendingDBs || mt_rand( 0, 100 ) == 0 ) {
- $pendingDBs = $this->getPendingDbs();
- $wgMemc->set( $memcKey, $pendingDBs, 300 );
+ // job type required/picked
+ if ( $this->hasOption( 'types' ) ) {
+ $types = explode( ' ', $this->getOption( 'types' ) );
+ } elseif ( $this->hasOption( 'type' ) ) {
+ $types = array( $this->getOption( 'type' ) );
+ } else {
+ $types = false;
}
- if ( !$pendingDBs ) {
- return;
+ // Handle any required periodic queue maintenance
+ $this->executeReadyPeriodicTasks();
+
+ // Get all the queues with jobs in them
+ $pendingDBs = JobQueueAggregator::singleton()->getAllReadyWikiQueues();
+ if ( !count( $pendingDBs ) ) {
+ return; // no DBs with jobs or cache is both empty and locked
}
do {
$again = false;
- if ( $type === false ) {
- $candidates = call_user_func_array( 'array_merge', $pendingDBs );
- } elseif ( isset( $pendingDBs[$type] ) ) {
- $candidates = $pendingDBs[$type];
- } else {
- $candidates = array();
- }
- if ( !$candidates ) {
- return;
- }
-
- $candidates = array_values( $candidates );
- $db = $candidates[ mt_rand( 0, count( $candidates ) - 1 ) ];
- if ( !$this->checkJob( $type, $db ) ) {
- // This job is not available in the current database. Remove it from
- // the cache.
- if ( $type === false ) {
- foreach ( $pendingDBs as $type2 => $dbs ) {
- $pendingDBs[$type2] = array_diff( $pendingDBs[$type2], array( $db ) );
+ $candidates = array(); // list of (type, db)
+ // Flatten the tree of candidates into a flat list so that a random
+ // item can be selected, weighing each queue (type/db tuple) equally.
+ foreach ( $pendingDBs as $type => $dbs ) {
+ if (
+ ( is_array( $types ) && in_array( $type, $types ) ) ||
+ ( $types === false && !in_array( $type, $wgJobTypesExcludedFromDefaultQueue ) )
+ ) {
+ foreach ( $dbs as $db ) {
+ $candidates[] = array( $type, $db );
}
- } else {
- $pendingDBs[$type] = array_diff( $pendingDBs[$type], array( $db ) );
}
+ }
+ if ( !count( $candidates ) ) {
+ return; // no jobs for this type
+ }
- $wgMemc->set( $memcKey, $pendingDBs, 300 );
+ list( $type, $db ) = $candidates[mt_rand( 0, count( $candidates ) - 1 )];
+ if ( JobQueueGroup::singleton( $db )->isQueueDeprioritized( $type ) ) {
+ $pendingDBs[$type] = array_diff( $pendingDBs[$type], array( $db ) );
$again = true;
}
} while ( $again );
- $this->output( $db . "\n" );
- }
-
- /**
- * Check if the specified database has a job of the specified type in it.
- * The type may be false to indicate "all".
- * @param $type string
- * @param $dbName string
- * @return bool
- */
- function checkJob( $type, $dbName ) {
- $lb = wfGetLB( $dbName );
- $db = $lb->getConnection( DB_MASTER, array(), $dbName );
- if ( $type === false ) {
- $conds = Job::defaultQueueConditions( );
+ if ( $this->hasOption( 'types' ) ) {
+ $this->output( $db . " " . $type . "\n" );
} else {
- $conds = array( 'job_cmd' => $type );
+ $this->output( $db . "\n" );
}
-
-
- $exists = (bool) $db->selectField( 'job', '1', $conds, __METHOD__ );
- $lb->reuseConnection( $db );
- return $exists;
}
/**
- * Get all databases that have a pending job
- * @return array
+ * Do all ready periodic jobs for all databases every 5 minutes (and .1% of the time)
+ * @return integer
*/
- private function getPendingDbs() {
- global $wgLocalDatabases;
- $pendingDBs = array();
- # Cross-reference DBs by master DB server
- $dbsByMaster = array();
- foreach ( $wgLocalDatabases as $db ) {
- $lb = wfGetLB( $db );
- $dbsByMaster[$lb->getServerName( 0 )][] = $db;
- }
-
- foreach ( $dbsByMaster as $dbs ) {
- $dbConn = wfGetDB( DB_MASTER, array(), $dbs[0] );
-
- # Padding row for MySQL bug
- $pad = str_repeat( '-', 40 );
- $sql = "(SELECT '$pad' as db, '$pad' as job_cmd)";
- foreach ( $dbs as $wikiId ) {
- if ( $sql != '' ) {
- $sql .= ' UNION ';
+ private function executeReadyPeriodicTasks() {
+ global $wgLocalDatabases, $wgMemc;
+
+ $count = 0;
+ $memcKey = 'jobqueue:periodic:lasttime';
+ $timestamp = (int)$wgMemc->get( $memcKey ); // UNIX timestamp or 0
+ if ( ( time() - $timestamp ) > 300 || mt_rand( 0, 999 ) == 0 ) { // 5 minutes
+ if ( $wgMemc->add( "$memcKey:rebuild", 1, 1800 ) ) { // lock
+ foreach ( $wgLocalDatabases as $db ) {
+ $count += JobQueueGroup::singleton( $db )->executeReadyPeriodicTasks();
}
-
- list( $dbName, $tablePrefix ) = wfSplitWikiID( $wikiId );
- $dbConn->tablePrefix( $tablePrefix );
- $jobTable = $dbConn->tableName( 'job' );
-
- $sql .= "(SELECT DISTINCT '$wikiId' as db, job_cmd FROM $dbName.$jobTable GROUP BY job_cmd)";
- }
- $res = $dbConn->query( $sql, __METHOD__ );
- $first = true;
- foreach ( $res as $row ) {
- if ( $first ) {
- // discard padding row
- $first = false;
- continue;
- }
- $pendingDBs[$row->job_cmd][] = $row->db;
+ $wgMemc->set( $memcKey, time() );
+ $wgMemc->delete( "$memcKey:rebuild" ); // unlock
}
}
- return $pendingDBs;
+
+ return $count;
}
}