summaryrefslogtreecommitdiff
path: root/includes/job/JobQueueGroup.php
diff options
context:
space:
mode:
Diffstat (limited to 'includes/job/JobQueueGroup.php')
-rw-r--r--includes/job/JobQueueGroup.php90
1 files changed, 83 insertions, 7 deletions
diff --git a/includes/job/JobQueueGroup.php b/includes/job/JobQueueGroup.php
index 351c71a3..fa7fee5f 100644
--- a/includes/job/JobQueueGroup.php
+++ b/includes/job/JobQueueGroup.php
@@ -36,10 +36,14 @@ class JobQueueGroup {
protected $wiki; // string; wiki ID
+ /** @var array Map of (bucket => (queue => JobQueue, types => list of types) */
+ protected $coalescedQueues;
+
const TYPE_DEFAULT = 1; // integer; jobs popped by default
const TYPE_ANY = 2; // integer; any job
const USE_CACHE = 1; // integer; use process or persistent cache
+ const USE_PRIORITY = 2; // integer; respect deprioritization
const PROC_CACHE_TTL = 15; // integer; seconds
@@ -146,6 +150,9 @@ class JobQueueGroup {
*/
public function pop( $qtype = self::TYPE_DEFAULT, $flags = 0 ) {
if ( is_string( $qtype ) ) { // specific job type
+ if ( ( $flags & self::USE_PRIORITY ) && $this->isQueueDeprioritized( $qtype ) ) {
+ return false; // back off
+ }
$job = $this->get( $qtype )->pop();
if ( !$job ) {
JobQueueAggregator::singleton()->notifyQueueEmpty( $this->wiki, $qtype );
@@ -167,6 +174,9 @@ class JobQueueGroup {
shuffle( $types ); // avoid starvation
foreach ( $types as $type ) { // for each queue...
+ if ( ( $flags & self::USE_PRIORITY ) && $this->isQueueDeprioritized( $type ) ) {
+ continue; // back off
+ }
$job = $this->get( $type )->pop();
if ( $job ) { // found
return $job;
@@ -247,15 +257,72 @@ class JobQueueGroup {
*/
public function getQueuesWithJobs() {
$types = array();
- foreach ( $this->getQueueTypes() as $type ) {
- if ( !$this->get( $type )->isEmpty() ) {
- $types[] = $type;
+ foreach ( $this->getCoalescedQueues() as $info ) {
+ $nonEmpty = $info['queue']->getSiblingQueuesWithJobs( $this->getQueueTypes() );
+ if ( is_array( $nonEmpty ) ) { // batching features supported
+ $types = array_merge( $types, $nonEmpty );
+ } else { // we have to go through the queues in the bucket one-by-one
+ foreach ( $info['types'] as $type ) {
+ if ( !$this->get( $type )->isEmpty() ) {
+ $types[] = $type;
+ }
+ }
}
}
return $types;
}
/**
+ * Get the size of the queus for a list of job types
+ *
+ * @return Array Map of (job type => size)
+ */
+ public function getQueueSizes() {
+ $sizeMap = array();
+ foreach ( $this->getCoalescedQueues() as $info ) {
+ $sizes = $info['queue']->getSiblingQueueSizes( $this->getQueueTypes() );
+ if ( is_array( $sizes ) ) { // batching features supported
+ $sizeMap = $sizeMap + $sizes;
+ } else { // we have to go through the queues in the bucket one-by-one
+ foreach ( $info['types'] as $type ) {
+ $sizeMap[$type] = $this->get( $type )->getSize();
+ }
+ }
+ }
+ return $sizeMap;
+ }
+
+ /**
+ * @return array
+ */
+ protected function getCoalescedQueues() {
+ global $wgJobTypeConf;
+
+ if ( $this->coalescedQueues === null ) {
+ $this->coalescedQueues = array();
+ foreach ( $wgJobTypeConf as $type => $conf ) {
+ $queue = JobQueue::factory(
+ array( 'wiki' => $this->wiki, 'type' => 'null' ) + $conf );
+ $loc = $queue->getCoalesceLocationInternal();
+ if ( !isset( $this->coalescedQueues[$loc] ) ) {
+ $this->coalescedQueues[$loc]['queue'] = $queue;
+ $this->coalescedQueues[$loc]['types'] = array();
+ }
+ if ( $type === 'default' ) {
+ $this->coalescedQueues[$loc]['types'] = array_merge(
+ $this->coalescedQueues[$loc]['types'],
+ array_diff( $this->getQueueTypes(), array_keys( $wgJobTypeConf ) )
+ );
+ } else {
+ $this->coalescedQueues[$loc]['types'][] = $type;
+ }
+ }
+ }
+
+ return $this->coalescedQueues;
+ }
+
+ /**
* Check if jobs should not be popped of a queue right now.
* This is only used for performance, such as to avoid spamming
* the queue with many sub-jobs before they actually get run.
@@ -264,10 +331,15 @@ class JobQueueGroup {
* @return bool
*/
public function isQueueDeprioritized( $type ) {
+ if ( $this->cache->has( 'isDeprioritized', $type, 5 ) ) {
+ return $this->cache->get( 'isDeprioritized', $type );
+ }
if ( $type === 'refreshLinks2' ) {
// Don't keep converting refreshLinks2 => refreshLinks jobs if the
// later jobs have not been done yet. This helps throttle queue spam.
- return !$this->get( 'refreshLinks' )->isEmpty();
+ $deprioritized = !$this->get( 'refreshLinks' )->isEmpty();
+ $this->cache->set( 'isDeprioritized', $type, $deprioritized );
+ return $deprioritized;
}
return false;
}
@@ -298,9 +370,13 @@ class JobQueueGroup {
} elseif ( !isset( $lastRuns[$type][$task] )
|| $lastRuns[$type][$task] < ( time() - $definition['period'] ) )
{
- if ( call_user_func( $definition['callback'] ) !== null ) {
- $tasksRun[$type][$task] = time();
- ++$count;
+ try {
+ if ( call_user_func( $definition['callback'] ) !== null ) {
+ $tasksRun[$type][$task] = time();
+ ++$count;
+ }
+ } catch ( JobQueueError $e ) {
+ MWExceptionHandler::logException( $e );
}
}
}