server = $params['redisServer']; $this->redisPool = RedisConnectionPool::singleton( $params['redisConfig'] ); } /** * @see JobQueueAggregator::doNotifyQueueEmpty() */ protected function doNotifyQueueEmpty( $wiki, $type ) { $conn = $this->getConnection(); if ( !$conn ) { return false; } try { $conn->hDel( $this->getReadyQueueKey(), $this->encQueueName( $type, $wiki ) ); return true; } catch ( RedisException $e ) { $this->handleException( $conn, $e ); return false; } } /** * @see JobQueueAggregator::doNotifyQueueNonEmpty() */ protected function doNotifyQueueNonEmpty( $wiki, $type ) { $conn = $this->getConnection(); if ( !$conn ) { return false; } try { $conn->hSet( $this->getReadyQueueKey(), $this->encQueueName( $type, $wiki ), time() ); return true; } catch ( RedisException $e ) { $this->handleException( $conn, $e ); return false; } } /** * @see JobQueueAggregator::doAllGetReadyWikiQueues() */ protected function doGetAllReadyWikiQueues() { $conn = $this->getConnection(); if ( !$conn ) { return array(); } try { $conn->multi( Redis::PIPELINE ); $conn->exists( $this->getReadyQueueKey() ); $conn->hGetAll( $this->getReadyQueueKey() ); list( $exists, $map ) = $conn->exec(); if ( $exists ) { // cache hit $pendingDBs = array(); // (type => list of wikis) foreach ( $map as $key => $time ) { list( $type, $wiki ) = $this->dencQueueName( $key ); $pendingDBs[$type][] = $wiki; } } else { // cache miss // Avoid duplicated effort $conn->multi( Redis::MULTI ); $conn->setnx( $this->getReadyQueueKey() . ":lock", 1 ); $conn->expire( $this->getReadyQueueKey() . ":lock", 3600 ); if ( $conn->exec() !== array( true, true ) ) { // lock return array(); // already in progress } $pendingDBs = $this->findPendingWikiQueues(); // (type => list of wikis) $conn->delete( $this->getReadyQueueKey() . ":lock" ); // unlock $now = time(); $map = array(); foreach ( $pendingDBs as $type => $wikis ) { foreach ( $wikis as $wiki ) { $map[$this->encQueueName( $type, $wiki )] = $now; } } $conn->hMSet( $this->getReadyQueueKey(), $map ); } return $pendingDBs; } catch ( RedisException $e ) { $this->handleException( $conn, $e ); return array(); } } /** * @see JobQueueAggregator::doPurge() */ protected function doPurge() { $conn = $this->getConnection(); if ( !$conn ) { return false; } try { $conn->delete( $this->getReadyQueueKey() ); } catch ( RedisException $e ) { $this->handleException( $conn, $e ); return false; } return true; } /** * Get a connection to the server that handles all sub-queues for this queue * * @return Array (server name, Redis instance) * @throws MWException */ protected function getConnection() { return $this->redisPool->getConnection( $this->server ); } /** * @param RedisConnRef $conn * @param RedisException $e * @return void */ protected function handleException( RedisConnRef $conn, $e ) { $this->redisPool->handleException( $this->server, $conn, $e ); } /** * @return string */ private function getReadyQueueKey() { return "jobqueue:aggregator:h-ready-queues:v1"; // global } /** * @param string $type * @param string $wiki * @return string */ private function encQueueName( $type, $wiki ) { return rawurlencode( $type ) . '/' . rawurlencode( $wiki ); } /** * @param string $name * @return string */ private function dencQueueName( $name ) { list( $type, $wiki ) = explode( '/', $name, 2 ); return array( rawurldecode( $type ), rawurldecode( $wiki ) ); } }