servers = isset( $params['redisServers'] ) ? $params['redisServers'] : array( $params['redisServer'] ); // b/c $params['redisConfig']['serializer'] = 'none'; $this->redisPool = RedisConnectionPool::singleton( $params['redisConfig'] ); } protected function doNotifyQueueEmpty( $wiki, $type ) { $conn = $this->getConnection(); if ( !$conn ) { return false; } try { $conn->hDel( $this->getReadyQueueKey(), $this->encQueueName( $type, $wiki ) ); return true; } catch ( RedisException $e ) { $this->handleException( $conn, $e ); return false; } } protected function doNotifyQueueNonEmpty( $wiki, $type ) { $conn = $this->getConnection(); if ( !$conn ) { return false; } try { $conn->multi( Redis::PIPELINE ); $conn->hSetNx( $this->getQueueTypesKey(), $type, 'enabled' ); $conn->hSet( $this->getReadyQueueKey(), $this->encQueueName( $type, $wiki ), time() ); $conn->exec(); return true; } catch ( RedisException $e ) { $this->handleException( $conn, $e ); return false; } } protected function doGetAllReadyWikiQueues() { $conn = $this->getConnection(); if ( !$conn ) { return array(); } try { $map = $conn->hGetAll( $this->getReadyQueueKey() ); if ( is_array( $map ) && isset( $map['_epoch'] ) ) { unset( $map['_epoch'] ); // ignore $pendingDBs = array(); // (type => list of wikis) foreach ( $map as $key => $time ) { list( $type, $wiki ) = $this->dencQueueName( $key ); $pendingDBs[$type][] = $wiki; } } else { // Avoid duplicated effort $rand = wfRandomString( 32 ); $conn->multi( Redis::MULTI ); $conn->setex( "{$rand}:lock", 3600, 1 ); $conn->renamenx( "{$rand}:lock", $this->getReadyQueueKey() . ":lock" ); if ( $conn->exec() !== array( true, true ) ) { // lock $conn->delete( "{$rand}:lock" ); return array(); // already in progress } $pendingDBs = $this->findPendingWikiQueues(); // (type => list of wikis) $conn->multi( Redis::PIPELINE ); $now = time(); $map = array( '_epoch' => time() ); // dummy key for empty Redis collections foreach ( $pendingDBs as $type => $wikis ) { $conn->hSetNx( $this->getQueueTypesKey(), $type, 'enabled' ); foreach ( $wikis as $wiki ) { $map[$this->encQueueName( $type, $wiki )] = $now; } } $conn->hMSet( $this->getReadyQueueKey(), $map ); $conn->exec(); $conn->delete( $this->getReadyQueueKey() . ":lock" ); // unlock } return $pendingDBs; } catch ( RedisException $e ) { $this->handleException( $conn, $e ); return array(); } } protected function doPurge() { $conn = $this->getConnection(); if ( !$conn ) { return false; } try { $conn->delete( $this->getReadyQueueKey() ); // leave key at getQueueTypesKey() alone } catch ( RedisException $e ) { $this->handleException( $conn, $e ); return false; } return true; } /** * Get a connection to the server that handles all sub-queues for this queue * * @return RedisConnRef|bool Returns false on failure * @throws MWException */ protected function getConnection() { $conn = false; foreach ( $this->servers as $server ) { $conn = $this->redisPool->getConnection( $server ); if ( $conn ) { break; } } return $conn; } /** * @param RedisConnRef $conn * @param RedisException $e * @return void */ protected function handleException( RedisConnRef $conn, $e ) { $this->redisPool->handleError( $conn, $e ); } /** * @return string */ private function getReadyQueueKey() { return "jobqueue:aggregator:h-ready-queues:v2"; // global } /** * @return string */ private function getQueueTypesKey() { return "jobqueue:aggregator:h-queue-types:v2"; // global } /** * @param string $type * @param string $wiki * @return string */ private function encQueueName( $type, $wiki ) { return rawurlencode( $type ) . '/' . rawurlencode( $wiki ); } /** * @param string $name * @return string */ private function dencQueueName( $name ) { list( $type, $wiki ) = explode( '/', $name, 2 ); return array( rawurldecode( $type ), rawurldecode( $wiki ) ); } }