summaryrefslogtreecommitdiff
path: root/maintenance/gearman/gearman.inc
blob: a2a4018aa323ebc5129cfab98bbaa6114dc7a32e (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
<?php

require( 'Net/Gearman/Client.php' );
require( 'Net/Gearman/Worker.php' );

class MWGearmanJob extends Net_Gearman_Job_Common {
	function switchWiki( $wiki, $params ) {
		echo "Switching to $wiki\n";

		# Pretend that we have completed it right now, because the new process won't do it
		$this->complete( array( 'result' => true ) );
		socket_close( $this->conn );

		# Close some more sockets
		wfGetLBFactory()->shutdown();
		global $wgMemc;
		$wgMemc->disconnect_all();

		# Find PHP
		$php = readlink( '/proc/' . posix_getpid() . '/exe' );
		
		# Run the worker script
		$args = array( $_SERVER['PHP_SELF'], 
			'--wiki', $wiki,
			'--fake-job', serialize( $params ) );
		$args = array_merge( $args, $GLOBALS['args'] );
		pcntl_exec( $php, $args, $_ENV );
		echo "Error running exec\n";
	}

	function run( $params ) {
		if ( wfWikiID() !== $params['wiki'] ) {
			$this->switchWiki( $params['wiki'], $params );
		}
		return self::runNoSwitch( $params );
	}

	static function runNoSwitch( $params ) {
		echo implode( ' ', $params ) . "\n";
		$title = Title::newFromText( $params['title'] );
		$mwJob = Job::factory( $params['command'], $title, $params['params'] );
		return $mwJob->run();
	}
}

class NonScaryGearmanWorker extends Net_Gearman_Worker {
	
	/**
	 * Copied from Net_Gearman_Worker but with the scary "run any PHP file in 
	 * the filesystem" feature removed.
	 */
	protected function doWork($socket) {
		Net_Gearman_Connection::send($socket, 'grab_job');

		$resp = array('function' => 'noop');
		while (count($resp) && $resp['function'] == 'noop') {
			$resp = Net_Gearman_Connection::blockingRead($socket);
		} 

		if (in_array($resp['function'], array('noop', 'no_job'))) {
			return false;
		}

		if ($resp['function'] != 'job_assign') {
			throw new Net_Gearman_Exception('Holy Cow! What are you doing?!');
		}

		$name   = $resp['data']['func'];
		$handle = $resp['data']['handle'];
		$arg    = array();

		if (isset($resp['data']['arg']) && 
			Net_Gearman_Connection::stringLength($resp['data']['arg'])) {
				$arg = json_decode($resp['data']['arg'], true);
			}

		### START MW DIFFERENT BIT
		if ( $name != 'mw_job' ) {
			throw new Net_Gearman_Job_Exception('Invalid function');
		}
		$job = new MWGearmanJob($socket, $handle);
		### END MW DIFFERENT BIT

		try {
			$this->start($handle, $name, $arg);
			$res = $job->run($arg); 
			if (!is_array($res)) {
				$res = array('result' => $res);
			}

			$job->complete($res);
			$this->complete($handle, $name, $res);
		} catch (Net_Gearman_Job_Exception $e) {
			$job->fail(); 
			$this->fail($handle, $name, $e); 
		}

		// Force the job's destructor to run
		$job = null;

		return true;
	}
}