summaryrefslogtreecommitdiff
path: root/maintenance/nextJobDB.php
diff options
context:
space:
mode:
Diffstat (limited to 'maintenance/nextJobDB.php')
-rw-r--r--maintenance/nextJobDB.php85
1 files changed, 67 insertions, 18 deletions
diff --git a/maintenance/nextJobDB.php b/maintenance/nextJobDB.php
index 8d8229e2..2d3608d5 100644
--- a/maintenance/nextJobDB.php
+++ b/maintenance/nextJobDB.php
@@ -33,20 +33,72 @@ class nextJobDB extends Maintenance {
public function execute() {
global $wgMemc;
$type = $this->getOption( 'type', false );
- $mckey = $type === false
- ? "jobqueue:dbs"
- : "jobqueue:dbs:$type";
- $pendingDBs = $wgMemc->get( $mckey );
- # If we didn't get it from the cache
+ $memcKey = 'jobqueue:dbs:v2';
+ $pendingDBs = $wgMemc->get( $memcKey );
+
+ // If the cache entry wasn't present, or in 1% of cases otherwise,
+ // regenerate the cache.
+ if ( !$pendingDBs || mt_rand( 0, 100 ) == 0 ) {
+ $pendingDBs = $this->getPendingDbs();
+ $wgMemc->set( $memcKey, $pendingDBs, 300 );
+ }
+
if ( !$pendingDBs ) {
- $pendingDBs = $this->getPendingDbs( $type );
- $wgMemc->set( $mckey, $pendingDBs, 300 );
+ return;
}
- # If we've got a pending job in a db, display it.
- if ( $pendingDBs ) {
- $this->output( $pendingDBs[mt_rand( 0, count( $pendingDBs ) - 1 )] );
+
+ do {
+ $again = false;
+
+ if ( $type === false ) {
+ $candidates = call_user_func_array( 'array_merge', $pendingDBs );
+ } elseif ( isset( $pendingDBs[$type] ) ) {
+ $candidates = $pendingDBs[$type];
+ } else {
+ $candidates = array();
+ }
+ if ( !$candidates ) {
+ return;
+ }
+
+ $candidates = array_values( $candidates );
+ $db = $candidates[ mt_rand( 0, count( $candidates ) - 1 ) ];
+ if ( !$this->checkJob( $type, $db ) ) {
+ // This job is not available in the current database. Remove it from
+ // the cache.
+ if ( $type === false ) {
+ foreach ( $pendingDBs as $type2 => $dbs ) {
+ $pendingDBs[$type2] = array_diff( $pendingDBs[$type2], array( $db ) );
+ }
+ } else {
+ $pendingDBs[$type] = array_diff( $pendingDBs[$type], array( $db ) );
+ }
+
+ $wgMemc->set( $memcKey, $pendingDBs, 300 );
+ $again = true;
+ }
+ } while ( $again );
+
+ $this->output( $db . "\n" );
+ }
+
+ /**
+ * Check if the specified database has a job of the specified type in it.
+ * The type may be false to indicate "all".
+ */
+ function checkJob( $type, $dbName ) {
+ $lb = wfGetLB( $dbName );
+ $db = $lb->getConnection( DB_MASTER, array(), $dbName );
+ if ( $type === false ) {
+ $conds = array();
+ } else {
+ $conds = array( 'job_cmd' => $type );
}
+
+ $exists = (bool) $db->selectField( 'job', '1', $conds, __METHOD__ );
+ $lb->reuseConnection( $db );
+ return $exists;
}
/**
@@ -54,7 +106,7 @@ class nextJobDB extends Maintenance {
* @param $type String Job type
* @return array
*/
- private function getPendingDbs( $type ) {
+ private function getPendingDbs() {
global $wgLocalDatabases;
$pendingDBs = array();
# Cross-reference DBs by master DB server
@@ -66,10 +118,10 @@ class nextJobDB extends Maintenance {
foreach ( $dbsByMaster as $dbs ) {
$dbConn = wfGetDB( DB_MASTER, array(), $dbs[0] );
- $stype = $dbConn->addQuotes( $type );
# Padding row for MySQL bug
- $sql = "(SELECT '-------------------------------------------' as db)";
+ $pad = str_repeat( '-', 40 );
+ $sql = "(SELECT '$pad' as db, '$pad' as job_cmd)";
foreach ( $dbs as $wikiId ) {
if ( $sql != '' ) {
$sql .= ' UNION ';
@@ -79,10 +131,7 @@ class nextJobDB extends Maintenance {
$dbConn->tablePrefix( $tablePrefix );
$jobTable = $dbConn->tableName( 'job' );
- if ( $type === false )
- $sql .= "(SELECT '$wikiId' as db FROM $dbName.$jobTable LIMIT 1)";
- else
- $sql .= "(SELECT '$wikiId' as db FROM $dbName.$jobTable WHERE job_cmd=$stype LIMIT 1)";
+ $sql .= "(SELECT DISTINCT '$wikiId' as db, job_cmd FROM $dbName.$jobTable GROUP BY job_cmd)";
}
$res = $dbConn->query( $sql, __METHOD__ );
$first = true;
@@ -92,7 +141,7 @@ class nextJobDB extends Maintenance {
$first = false;
continue;
}
- $pendingDBs[] = $row->db;
+ $pendingDBs[$row->job_cmd][] = $row->db;
}
}
return $pendingDBs;