summaryrefslogtreecommitdiff
path: root/maintenance/gearman
diff options
context:
space:
mode:
Diffstat (limited to 'maintenance/gearman')
-rw-r--r--maintenance/gearman/gearman.inc104
-rw-r--r--maintenance/gearman/gearmanRefreshLinks.php45
-rw-r--r--maintenance/gearman/gearmanWorker.php41
3 files changed, 190 insertions, 0 deletions
diff --git a/maintenance/gearman/gearman.inc b/maintenance/gearman/gearman.inc
new file mode 100644
index 00000000..a2a4018a
--- /dev/null
+++ b/maintenance/gearman/gearman.inc
@@ -0,0 +1,104 @@
+<?php
+
+require( 'Net/Gearman/Client.php' );
+require( 'Net/Gearman/Worker.php' );
+
+class MWGearmanJob extends Net_Gearman_Job_Common {
+ function switchWiki( $wiki, $params ) {
+ echo "Switching to $wiki\n";
+
+ # Pretend that we have completed it right now, because the new process won't do it
+ $this->complete( array( 'result' => true ) );
+ socket_close( $this->conn );
+
+ # Close some more sockets
+ wfGetLBFactory()->shutdown();
+ global $wgMemc;
+ $wgMemc->disconnect_all();
+
+ # Find PHP
+ $php = readlink( '/proc/' . posix_getpid() . '/exe' );
+
+ # Run the worker script
+ $args = array( $_SERVER['PHP_SELF'],
+ '--wiki', $wiki,
+ '--fake-job', serialize( $params ) );
+ $args = array_merge( $args, $GLOBALS['args'] );
+ pcntl_exec( $php, $args, $_ENV );
+ echo "Error running exec\n";
+ }
+
+ function run( $params ) {
+ if ( wfWikiID() !== $params['wiki'] ) {
+ $this->switchWiki( $params['wiki'], $params );
+ }
+ return self::runNoSwitch( $params );
+ }
+
+ static function runNoSwitch( $params ) {
+ echo implode( ' ', $params ) . "\n";
+ $title = Title::newFromText( $params['title'] );
+ $mwJob = Job::factory( $params['command'], $title, $params['params'] );
+ return $mwJob->run();
+ }
+}
+
+class NonScaryGearmanWorker extends Net_Gearman_Worker {
+
+ /**
+ * Copied from Net_Gearman_Worker but with the scary "run any PHP file in
+ * the filesystem" feature removed.
+ */
+ protected function doWork($socket) {
+ Net_Gearman_Connection::send($socket, 'grab_job');
+
+ $resp = array('function' => 'noop');
+ while (count($resp) && $resp['function'] == 'noop') {
+ $resp = Net_Gearman_Connection::blockingRead($socket);
+ }
+
+ if (in_array($resp['function'], array('noop', 'no_job'))) {
+ return false;
+ }
+
+ if ($resp['function'] != 'job_assign') {
+ throw new Net_Gearman_Exception('Holy Cow! What are you doing?!');
+ }
+
+ $name = $resp['data']['func'];
+ $handle = $resp['data']['handle'];
+ $arg = array();
+
+ if (isset($resp['data']['arg']) &&
+ Net_Gearman_Connection::stringLength($resp['data']['arg'])) {
+ $arg = json_decode($resp['data']['arg'], true);
+ }
+
+ ### START MW DIFFERENT BIT
+ if ( $name != 'mw_job' ) {
+ throw new Net_Gearman_Job_Exception('Invalid function');
+ }
+ $job = new MWGearmanJob($socket, $handle);
+ ### END MW DIFFERENT BIT
+
+ try {
+ $this->start($handle, $name, $arg);
+ $res = $job->run($arg);
+ if (!is_array($res)) {
+ $res = array('result' => $res);
+ }
+
+ $job->complete($res);
+ $this->complete($handle, $name, $res);
+ } catch (Net_Gearman_Job_Exception $e) {
+ $job->fail();
+ $this->fail($handle, $name, $e);
+ }
+
+ // Force the job's destructor to run
+ $job = null;
+
+ return true;
+ }
+}
+
diff --git a/maintenance/gearman/gearmanRefreshLinks.php b/maintenance/gearman/gearmanRefreshLinks.php
new file mode 100644
index 00000000..eb3104eb
--- /dev/null
+++ b/maintenance/gearman/gearmanRefreshLinks.php
@@ -0,0 +1,45 @@
+<?php
+
+$optionsWithArgs = array( 'fake-job' );
+
+require( dirname(__FILE__).'/../commandLine.inc' );
+require( dirname(__FILE__).'/gearman.inc' );
+
+if ( !$args ) {
+ $args = array( 'localhost' );
+}
+$client = new Net_Gearman_Client( $args );
+$batchSize = 1000;
+
+$dbr = wfGetDB( DB_SLAVE );
+$startId = 0;
+$endId = $dbr->selectField( 'page', 'MAX(page_id)', false, __METHOD__ );
+while ( true ) {
+ $res = $dbr->select(
+ 'page',
+ array( 'page_namespace', 'page_title', 'page_id' ),
+ array( 'page_id > ' . intval( $startId ) ),
+ __METHOD__,
+ array( 'LIMIT' => $batchSize )
+ );
+
+ if ( $res->numRows() == 0 ) {
+ break;
+ }
+ $set = new Net_Gearman_Set;
+ foreach ( $res as $row ) {
+ $startId = $row->page_id;
+ $title = Title::makeTitle( $row->page_namespace, $row->page_title );
+ $params = array(
+ 'wiki' => wfWikiID(),
+ 'title' => $title->getPrefixedDBkey(),
+ 'command' => 'refreshLinks',
+ 'params' => false,
+ );
+ $task = new Net_Gearman_Task( 'mw_job', $params );
+ $set->addTask( $task );
+ }
+ $client->runSet( $set );
+ print "$startId / $endId\n";
+}
+
diff --git a/maintenance/gearman/gearmanWorker.php b/maintenance/gearman/gearmanWorker.php
new file mode 100644
index 00000000..0b26ff9f
--- /dev/null
+++ b/maintenance/gearman/gearmanWorker.php
@@ -0,0 +1,41 @@
+<?php
+
+$optionsWithArgs = array( 'fake-job', 'procs' );
+require( dirname(__FILE__).'/../commandLine.inc' );
+require( dirname(__FILE__).'/gearman.inc' );
+
+if ( isset( $options['procs'] ) ) {
+ $procs = $options['procs'];
+ if ( $procs < 1 || $procs > 1000 ) {
+ echo "Invalid number of processes, please specify a number between 1 and 1000\n";
+ exit( 1 );
+ }
+ $fc = new ForkController( $procs, ForkController::RESTART_ON_ERROR );
+ if ( $fc->start() != 'child' ) {
+ exit( 0 );
+ }
+}
+
+if ( !$args ) {
+ $args = array( 'localhost' );
+}
+
+if ( isset( $options['fake-job'] ) ) {
+ $params = unserialize( $options['fake-job'] );
+ MWGearmanJob::runNoSwitch( $params );
+}
+
+$worker = new NonScaryGearmanWorker( $args );
+$worker->addAbility( 'mw_job' );
+$worker->beginWork( 'wfGearmanMonitor' );
+
+function wfGearmanMonitor( $idle, $lastJob ) {
+ static $lastSleep = 0;
+ $interval = 5;
+ $now = time();
+ if ( $now - $lastSleep >= $interval ) {
+ wfWaitForSlaves( $interval );
+ $lastSleep = $now;
+ }
+ return false;
+}