diff options
Diffstat (limited to 'includes/db/LoadBalancer.php')
-rw-r--r-- | includes/db/LoadBalancer.php | 252 |
1 files changed, 156 insertions, 96 deletions
diff --git a/includes/db/LoadBalancer.php b/includes/db/LoadBalancer.php index 0e455e0c..857109db 100644 --- a/includes/db/LoadBalancer.php +++ b/includes/db/LoadBalancer.php @@ -37,14 +37,15 @@ class LoadBalancer { private $mLoadMonitorClass, $mLoadMonitor; /** - * @param $params Array with keys: + * @param array $params with keys: * servers Required. Array of server info structures. * masterWaitTimeout Replication lag wait timeout * loadMonitor Name of a class used to fetch server lag and load. + * @throws MWException */ function __construct( $params ) { if ( !isset( $params['servers'] ) ) { - throw new MWException( __CLASS__.': missing servers parameter' ); + throw new MWException( __CLASS__ . ': missing servers parameter' ); } $this->mServers = $params['servers']; @@ -77,7 +78,7 @@ class LoadBalancer { } } - foreach( $params['servers'] as $i => $server ) { + foreach ( $params['servers'] as $i => $server ) { $this->mLoads[$i] = $server['load']; if ( isset( $server['groupLoads'] ) ) { foreach ( $server['groupLoads'] as $group => $ratio ) { @@ -116,34 +117,14 @@ class LoadBalancer { * Given an array of non-normalised probabilities, this function will select * an element and return the appropriate key * + * @deprecated since 1.21, use ArrayUtils::pickRandom() + * * @param $weights array * - * @return int + * @return bool|int|string */ function pickRandom( $weights ) { - if ( !is_array( $weights ) || count( $weights ) == 0 ) { - return false; - } - - $sum = array_sum( $weights ); - if ( $sum == 0 ) { - # No loads on any of them - # In previous versions, this triggered an unweighted random selection, - # but this feature has been removed as of April 2006 to allow for strict - # separation of query groups. - return false; - } - $max = mt_getrandmax(); - $rand = mt_rand( 0, $max ) / $max * $sum; - - $sum = 0; - foreach ( $weights as $i => $w ) { - $sum += $w; - if ( $sum >= $rand ) { - break; - } - } - return $i; + return ArrayUtils::pickRandom( $weights ); } /** @@ -186,7 +167,7 @@ class LoadBalancer { #wfDebugLog( 'connect', var_export( $loads, true ) ); # Return a random representative of the remainder - return $this->pickRandom( $loads ); + return ArrayUtils::pickRandom( $loads ); } /** @@ -197,17 +178,18 @@ class LoadBalancer { * Side effect: opens connections to databases * @param $group bool * @param $wiki bool + * @throws MWException * @return bool|int|string */ function getReaderIndex( $group = false, $wiki = false ) { global $wgReadOnly, $wgDBClusterTimeout, $wgDBAvgStatusPoll, $wgDBtype; # @todo FIXME: For now, only go through all this for mysql databases - if ($wgDBtype != 'mysql') { + if ( $wgDBtype != 'mysql' ) { return $this->getWriterIndex(); } - if ( count( $this->mServers ) == 1 ) { + if ( count( $this->mServers ) == 1 ) { # Skip the load balancing if there's only one server return 0; } elseif ( $group === false and $this->mReadIndex >= 0 ) { @@ -228,7 +210,7 @@ class LoadBalancer { $nonErrorLoads = $this->mGroupLoads[$group]; } else { # No loads for this group, return false and the caller can use some other group - wfDebug( __METHOD__.": no loads for group $group\n" ); + wfDebug( __METHOD__ . ": no loads for group $group\n" ); wfProfileOut( __METHOD__ ); return false; } @@ -237,6 +219,7 @@ class LoadBalancer { } if ( !$nonErrorLoads ) { + wfProfileOut( __METHOD__ ); throw new MWException( "Empty server array given to LoadBalancer" ); } @@ -253,15 +236,15 @@ class LoadBalancer { $currentLoads = $nonErrorLoads; while ( count( $currentLoads ) ) { if ( $wgReadOnly || $this->mAllowLagged || $laggedSlaveMode ) { - $i = $this->pickRandom( $currentLoads ); + $i = ArrayUtils::pickRandom( $currentLoads ); } else { $i = $this->getRandomNonLagged( $currentLoads, $wiki ); - if ( $i === false && count( $currentLoads ) != 0 ) { + if ( $i === false && count( $currentLoads ) != 0 ) { # All slaves lagged. Switch to read-only mode wfDebugLog( 'replication', "All slaves lagged. Switch to read-only mode\n" ); $wgReadOnly = 'The database has been automatically locked ' . 'while the slave database servers catch up to the master'; - $i = $this->pickRandom( $currentLoads ); + $i = ArrayUtils::pickRandom( $currentLoads ); $laggedSlaveMode = true; } } @@ -270,16 +253,16 @@ class LoadBalancer { # pickRandom() returned false # This is permanent and means the configuration or the load monitor # wants us to return false. - wfDebugLog( 'connect', __METHOD__.": pickRandom() returned false\n" ); + wfDebugLog( 'connect', __METHOD__ . ": pickRandom() returned false\n" ); wfProfileOut( __METHOD__ ); return false; } - wfDebugLog( 'connect', __METHOD__.": Using reader #$i: {$this->mServers[$i]['host']}...\n" ); + wfDebugLog( 'connect', __METHOD__ . ": Using reader #$i: {$this->mServers[$i]['host']}...\n" ); $conn = $this->openConnection( $i, $wiki ); if ( !$conn ) { - wfDebugLog( 'connect', __METHOD__.": Failed connecting to $i/$wiki\n" ); + wfDebugLog( 'connect', __METHOD__ . ": Failed connecting to $i/$wiki\n" ); unset( $nonErrorLoads[$i] ); unset( $currentLoads[$i] ); continue; @@ -318,7 +301,7 @@ class LoadBalancer { # Some servers must have been overloaded if ( $overloadedServers == 0 ) { - throw new MWException( __METHOD__.": unexpectedly found no overloaded servers" ); + throw new MWException( __METHOD__ . ": unexpectedly found no overloaded servers" ); } # Back off for a while # Scale the sleep time by the number of connected threads, to produce a @@ -341,7 +324,7 @@ class LoadBalancer { $this->mServers[$i]['slave pos'] = $conn->getSlavePos(); } } - if ( $this->mReadIndex <=0 && $this->mLoads[$i]>0 && $i !== false ) { + if ( $this->mReadIndex <= 0 && $this->mLoads[$i] > 0 && $i !== false ) { $this->mReadIndex = $i; } } @@ -356,7 +339,7 @@ class LoadBalancer { */ function sleep( $t ) { wfProfileIn( __METHOD__ ); - wfDebug( __METHOD__.": waiting $t us\n" ); + wfDebug( __METHOD__ . ": waiting $t us\n" ); usleep( $t ); wfProfileOut( __METHOD__ ); return $t; @@ -366,7 +349,7 @@ class LoadBalancer { * Set the master wait position * If a DB_SLAVE connection has been opened already, waits * Otherwise sets a variable telling it to wait if such a connection is opened - * @param $pos int + * @param $pos DBMasterPos */ public function waitFor( $pos ) { wfProfileIn( __METHOD__ ); @@ -384,13 +367,13 @@ class LoadBalancer { /** * Set the master wait position and wait for ALL slaves to catch up to it - * @param $pos int + * @param $pos DBMasterPos */ public function waitForAll( $pos ) { wfProfileIn( __METHOD__ ); $this->mWaitForPos = $pos; for ( $i = 1; $i < count( $this->mServers ); $i++ ) { - $this->doWait( $i , true ); + $this->doWait( $i, true ); } wfProfileOut( __METHOD__ ); } @@ -417,7 +400,7 @@ class LoadBalancer { * @param $open bool * @return bool */ - function doWait( $index, $open = false ) { + protected function doWait( $index, $open = false ) { # Find a connection to wait on $conn = $this->getAnyOpenConnection( $index ); if ( !$conn ) { @@ -425,7 +408,7 @@ class LoadBalancer { wfDebug( __METHOD__ . ": no connection open\n" ); return false; } else { - $conn = $this->openConnection( $index ); + $conn = $this->openConnection( $index, '' ); if ( !$conn ) { wfDebug( __METHOD__ . ": failed to open connection\n" ); return false; @@ -433,15 +416,15 @@ class LoadBalancer { } } - wfDebug( __METHOD__.": Waiting for slave #$index to catch up...\n" ); + wfDebug( __METHOD__ . ": Waiting for slave #$index to catch up...\n" ); $result = $conn->masterPosWait( $this->mWaitForPos, $this->mWaitTimeout ); if ( $result == -1 || is_null( $result ) ) { # Timed out waiting for slave, use master instead - wfDebug( __METHOD__.": Timed out waiting for slave #$index pos {$this->mWaitForPos}\n" ); + wfDebug( __METHOD__ . ": Timed out waiting for slave #$index pos {$this->mWaitForPos}\n" ); return false; } else { - wfDebug( __METHOD__.": Done\n" ); + wfDebug( __METHOD__ . ": Done\n" ); return true; } } @@ -451,17 +434,20 @@ class LoadBalancer { * This is the main entry point for this class. * * @param $i Integer: server index - * @param $groups Array: query groups - * @param $wiki String: wiki ID + * @param array $groups query groups + * @param bool|string $wiki Wiki ID * + * @throws MWException * @return DatabaseBase */ public function &getConnection( $i, $groups = array(), $wiki = false ) { wfProfileIn( __METHOD__ ); if ( $i == DB_LAST ) { + wfProfileOut( __METHOD__ ); throw new MWException( 'Attempt to call ' . __METHOD__ . ' with deprecated server index DB_LAST' ); } elseif ( $i === null || $i === false ) { + wfProfileOut( __METHOD__ ); throw new MWException( 'Attempt to call ' . __METHOD__ . ' with invalid server index' ); } @@ -476,7 +462,7 @@ class LoadBalancer { $groupIndex = $this->getReaderIndex( $groups, $wiki ); if ( $groupIndex !== false ) { $serverName = $this->getServerName( $groupIndex ); - wfDebug( __METHOD__.": using server $serverName for group $groups\n" ); + wfDebug( __METHOD__ . ": using server $serverName for group $groups\n" ); $i = $groupIndex; } } else { @@ -484,7 +470,7 @@ class LoadBalancer { $groupIndex = $this->getReaderIndex( $group, $wiki ); if ( $groupIndex !== false ) { $serverName = $this->getServerName( $groupIndex ); - wfDebug( __METHOD__.": using server $serverName for group $group\n" ); + wfDebug( __METHOD__ . ": using server $serverName for group $group\n" ); $i = $groupIndex; break; } @@ -493,20 +479,21 @@ class LoadBalancer { # Operation-based index if ( $i == DB_SLAVE ) { + $this->mLastError = 'Unknown error'; // reset error string $i = $this->getReaderIndex( false, $wiki ); # Couldn't find a working server in getReaderIndex()? if ( $i === false ) { $this->mLastError = 'No working slave server: ' . $this->mLastError; - $this->reportConnectionError( $this->mErrorConnection ); wfProfileOut( __METHOD__ ); - return false; + return $this->reportConnectionError(); } } # Now we have an explicit index into the servers array $conn = $this->openConnection( $i, $wiki ); if ( !$conn ) { - $this->reportConnectionError( $this->mErrorConnection ); + wfProfileOut( __METHOD__ ); + return $this->reportConnectionError(); } wfProfileOut( __METHOD__ ); @@ -519,10 +506,11 @@ class LoadBalancer { * the same number of times as getConnection() to work. * * @param DatabaseBase $conn + * @throws MWException */ public function reuseConnection( $conn ) { - $serverIndex = $conn->getLBInfo('serverIndex'); - $refCount = $conn->getLBInfo('foreignPoolRefCount'); + $serverIndex = $conn->getLBInfo( 'serverIndex' ); + $refCount = $conn->getLBInfo( 'foreignPoolRefCount' ); $dbName = $conn->getDBname(); $prefix = $conn->tablePrefix(); if ( strval( $prefix ) !== '' ) { @@ -531,7 +519,7 @@ class LoadBalancer { $wiki = $dbName; } if ( $serverIndex === null || $refCount === null ) { - wfDebug( __METHOD__.": this connection was not opened as a foreign connection\n" ); + wfDebug( __METHOD__ . ": this connection was not opened as a foreign connection\n" ); /** * This can happen in code like: * foreach ( $dbs as $db ) { @@ -545,19 +533,51 @@ class LoadBalancer { return; } if ( $this->mConns['foreignUsed'][$serverIndex][$wiki] !== $conn ) { - throw new MWException( __METHOD__.": connection not found, has the connection been freed already?" ); + throw new MWException( __METHOD__ . ": connection not found, has the connection been freed already?" ); } $conn->setLBInfo( 'foreignPoolRefCount', --$refCount ); if ( $refCount <= 0 ) { $this->mConns['foreignFree'][$serverIndex][$wiki] = $conn; unset( $this->mConns['foreignUsed'][$serverIndex][$wiki] ); - wfDebug( __METHOD__.": freed connection $serverIndex/$wiki\n" ); + wfDebug( __METHOD__ . ": freed connection $serverIndex/$wiki\n" ); } else { - wfDebug( __METHOD__.": reference count for $serverIndex/$wiki reduced to $refCount\n" ); + wfDebug( __METHOD__ . ": reference count for $serverIndex/$wiki reduced to $refCount\n" ); } } /** + * Get a database connection handle reference + * + * The handle's methods wrap simply wrap those of a DatabaseBase handle + * + * @see LoadBalancer::getConnection() for parameter information + * + * @param integer $db + * @param mixed $groups + * @param string $wiki + * @return DBConnRef + */ + public function getConnectionRef( $db, $groups = array(), $wiki = false ) { + return new DBConnRef( $this, $this->getConnection( $db, $groups, $wiki ) ); + } + + /** + * Get a database connection handle reference without connecting yet + * + * The handle's methods wrap simply wrap those of a DatabaseBase handle + * + * @see LoadBalancer::getConnection() for parameter information + * + * @param integer $db + * @param mixed $groups + * @param string $wiki + * @return DBConnRef + */ + public function getLazyConnectionRef( $db, $groups = array(), $wiki = false ) { + return new DBConnRef( $this, array( $db, $groups, $wiki ) ); + } + + /** * Open a connection to the server given by the specified index * Index must be an actual index into the array. * If the server is already open, returns it. @@ -566,7 +586,7 @@ class LoadBalancer { * error will be available via $this->mErrorConnection. * * @param $i Integer server index - * @param $wiki String wiki ID to open + * @param string $wiki wiki ID to open * @return DatabaseBase * * @access private @@ -575,7 +595,7 @@ class LoadBalancer { wfProfileIn( __METHOD__ ); if ( $wiki !== false ) { $conn = $this->openForeignConnection( $i, $wiki ); - wfProfileOut( __METHOD__); + wfProfileOut( __METHOD__ ); return $conn; } if ( isset( $this->mConns['local'][$i][0] ) ) { @@ -585,6 +605,7 @@ class LoadBalancer { $server['serverIndex'] = $i; $conn = $this->reallyOpenConnection( $server, false ); if ( $conn->isOpen() ) { + wfDebug( "Connected to database $i at {$this->mServers[$i]['host']}\n" ); $this->mConns['local'][$i][0] = $conn; } else { wfDebug( "Failed to connect to database $i at {$this->mServers[$i]['host']}\n" ); @@ -611,22 +632,22 @@ class LoadBalancer { * error will be available via $this->mErrorConnection. * * @param $i Integer: server index - * @param $wiki String: wiki ID to open + * @param string $wiki wiki ID to open * @return DatabaseBase */ function openForeignConnection( $i, $wiki ) { - wfProfileIn(__METHOD__); + wfProfileIn( __METHOD__ ); list( $dbName, $prefix ) = wfSplitWikiID( $wiki ); if ( isset( $this->mConns['foreignUsed'][$i][$wiki] ) ) { // Reuse an already-used connection $conn = $this->mConns['foreignUsed'][$i][$wiki]; - wfDebug( __METHOD__.": reusing connection $i/$wiki\n" ); + wfDebug( __METHOD__ . ": reusing connection $i/$wiki\n" ); } elseif ( isset( $this->mConns['foreignFree'][$i][$wiki] ) ) { // Reuse a free connection for the same wiki $conn = $this->mConns['foreignFree'][$i][$wiki]; unset( $this->mConns['foreignFree'][$i][$wiki] ); $this->mConns['foreignUsed'][$i][$wiki] = $conn; - wfDebug( __METHOD__.": reusing free connection $i/$wiki\n" ); + wfDebug( __METHOD__ . ": reusing free connection $i/$wiki\n" ); } elseif ( !empty( $this->mConns['foreignFree'][$i] ) ) { // Reuse a connection from another wiki $conn = reset( $this->mConns['foreignFree'][$i] ); @@ -641,22 +662,23 @@ class LoadBalancer { $conn->tablePrefix( $prefix ); unset( $this->mConns['foreignFree'][$i][$oldWiki] ); $this->mConns['foreignUsed'][$i][$wiki] = $conn; - wfDebug( __METHOD__.": reusing free connection from $oldWiki for $wiki\n" ); + wfDebug( __METHOD__ . ": reusing free connection from $oldWiki for $wiki\n" ); } } else { // Open a new connection $server = $this->mServers[$i]; $server['serverIndex'] = $i; $server['foreignPoolRefCount'] = 0; + $server['foreign'] = true; $conn = $this->reallyOpenConnection( $server, $dbName ); if ( !$conn->isOpen() ) { - wfDebug( __METHOD__.": error opening connection for $i/$wiki\n" ); + wfDebug( __METHOD__ . ": error opening connection for $i/$wiki\n" ); $this->mErrorConnection = $conn; $conn = false; } else { $conn->tablePrefix( $prefix ); $this->mConns['foreignUsed'][$i][$wiki] = $conn; - wfDebug( __METHOD__.": opened new connection for $i/$wiki\n" ); + wfDebug( __METHOD__ . ": opened new connection for $i/$wiki\n" ); } } @@ -665,7 +687,7 @@ class LoadBalancer { $refCount = $conn->getLBInfo( 'foreignPoolRefCount' ); $conn->setLBInfo( 'foreignPoolRefCount', $refCount + 1 ); } - wfProfileOut(__METHOD__); + wfProfileOut( __METHOD__ ); return $conn; } @@ -677,7 +699,7 @@ class LoadBalancer { * @return bool */ function isOpen( $index ) { - if( !is_integer( $index ) ) { + if ( !is_integer( $index ) ) { return false; } return (bool)$this->getAnyOpenConnection( $index ); @@ -690,23 +712,20 @@ class LoadBalancer { * * @param $server * @param $dbNameOverride bool + * @throws MWException * @return DatabaseBase */ function reallyOpenConnection( $server, $dbNameOverride = false ) { - if( !is_array( $server ) ) { + if ( !is_array( $server ) ) { throw new MWException( 'You must update your load-balancing configuration. ' . 'See DefaultSettings.php entry for $wgDBservers.' ); } - $host = $server['host']; - $dbname = $server['dbname']; - if ( $dbNameOverride !== false ) { - $server['dbname'] = $dbname = $dbNameOverride; + $server['dbname'] = $dbNameOverride; } # Create object - wfDebug( "Connecting to $host $dbname...\n" ); try { $db = DatabaseBase::factory( $server['type'], $server ); } catch ( DBConnectionError $e ) { @@ -715,11 +734,6 @@ class LoadBalancer { $db = $e->db; } - if ( $db->isOpen() ) { - wfDebug( "Connected to $host $dbname.\n" ); - } else { - wfDebug( "Connection failed to $host $dbname.\n" ); - } $db->setLBInfo( $server ); if ( isset( $server['fakeSlaveLag'] ) ) { $db->setFakeSlaveLag( $server['fakeSlaveLag'] ); @@ -731,24 +745,24 @@ class LoadBalancer { } /** - * @param $conn * @throws DBConnectionError + * @return bool */ - function reportConnectionError( &$conn ) { - wfProfileIn( __METHOD__ ); + private function reportConnectionError() { + $conn = $this->mErrorConnection; // The connection which caused the error if ( !is_object( $conn ) ) { // No last connection, probably due to all servers being too busy wfLogDBError( "LB failure with no last connection. Connection error: {$this->mLastError}\n" ); - $conn = new Database; + // If all servers were busy, mLastError will contain something sensible - throw new DBConnectionError( $conn, $this->mLastError ); + throw new DBConnectionError( null, $this->mLastError ); } else { $server = $conn->getProperty( 'mServer' ); wfLogDBError( "Connection error: {$this->mLastError} ({$server})\n" ); - $conn->reportConnectionError( "{$this->mLastError} ({$server})" ); + $conn->reportConnectionError( "{$this->mLastError} ({$server})" ); // throws DBConnectionError } - wfProfileOut( __METHOD__ ); + return false; /* not reached */ } /** @@ -853,7 +867,7 @@ class LoadBalancer { */ function closeAll() { foreach ( $this->mConns as $conns2 ) { - foreach ( $conns2 as $conns3 ) { + foreach ( $conns2 as $conns3 ) { foreach ( $conns3 as $conn ) { $conn->close(); } @@ -909,7 +923,9 @@ class LoadBalancer { foreach ( $this->mConns as $conns2 ) { foreach ( $conns2 as $conns3 ) { foreach ( $conns3 as $conn ) { - $conn->commit( __METHOD__ ); + if ( $conn->trxLevel() ) { + $conn->commit( __METHOD__, 'flush' ); + } } } } @@ -926,8 +942,8 @@ class LoadBalancer { continue; } foreach ( $conns2[$masterIndex] as $conn ) { - if ( $conn->writesOrCallbacksPending() ) { - $conn->commit( __METHOD__ ); + if ( $conn->trxLevel() && $conn->writesOrCallbacksPending() ) { + $conn->commit( __METHOD__, 'flush' ); } } } @@ -954,10 +970,11 @@ class LoadBalancer { * @return bool */ function allowLagged( $mode = null ) { - if ( $mode === null) { + if ( $mode === null ) { return $this->mAllowLagged; } $this->mAllowLagged = $mode; + return $this->mAllowLagged; } /** @@ -999,7 +1016,7 @@ class LoadBalancer { * May attempt to open connections to slaves on the default DB. If there is * no lag, the maximum lag will be reported as -1. * - * @param $wiki string Wiki ID, or false for the default database + * @param string $wiki Wiki ID, or false for the default database * * @return array ( host, max lag, index of max lagged host ) */ @@ -1084,3 +1101,46 @@ class LoadBalancer { $this->mLagTimes = null; } } + +/** + * Helper class to handle automatically marking connectons as reusable (via RAII pattern) + * as well handling deferring the actual network connection until the handle is used + * + * @ingroup Database + * @since 1.22 + */ +class DBConnRef implements IDatabase { + /** @var LoadBalancer */ + protected $lb; + /** @var DatabaseBase|null */ + protected $conn; + /** @var Array|null */ + protected $params; + + /** + * @param $lb LoadBalancer + * @param $conn DatabaseBase|array Connection or (server index, group, wiki ID) array + */ + public function __construct( LoadBalancer $lb, $conn ) { + $this->lb = $lb; + if ( $conn instanceof DatabaseBase ) { + $this->conn = $conn; + } else { + $this->params = $conn; + } + } + + public function __call( $name, $arguments ) { + if ( $this->conn === null ) { + list( $db, $groups, $wiki ) = $this->params; + $this->conn = $this->lb->getConnection( $db, $groups, $wiki ); + } + return call_user_func_array( array( $this->conn, $name ), $arguments ); + } + + function __destruct() { + if ( $this->conn !== null ) { + $this->lb->reuseConnection( $this->conn ); + } + } +} |