From b9b85843572bf283f48285001e276ba7e61b63f6 Mon Sep 17 00:00:00 2001 From: Pierre Schmitz Date: Sun, 22 Feb 2009 13:37:51 +0100 Subject: updated to MediaWiki 1.14.0 --- maintenance/storage/blob_tracking.sql | 57 +++ maintenance/storage/compressOld.inc | 10 +- maintenance/storage/compressOld.php | 6 +- maintenance/storage/dumpRev.php | 50 +- maintenance/storage/orphanStats.php | 46 ++ maintenance/storage/recompressTracked.php | 742 ++++++++++++++++++++++++++++++ maintenance/storage/testCompression.php | 81 ++++ maintenance/storage/trackBlobs.php | 316 +++++++++++++ 8 files changed, 1290 insertions(+), 18 deletions(-) create mode 100644 maintenance/storage/blob_tracking.sql create mode 100644 maintenance/storage/orphanStats.php create mode 100644 maintenance/storage/recompressTracked.php create mode 100644 maintenance/storage/testCompression.php create mode 100644 maintenance/storage/trackBlobs.php (limited to 'maintenance/storage') diff --git a/maintenance/storage/blob_tracking.sql b/maintenance/storage/blob_tracking.sql new file mode 100644 index 00000000..6cac9a38 --- /dev/null +++ b/maintenance/storage/blob_tracking.sql @@ -0,0 +1,57 @@ + +-- Table for tracking blobs prior to recompression or similar maintenance operations + +CREATE TABLE /*$wgDBprefix*/blob_tracking ( + -- page.page_id + -- This may be zero for orphan or deleted text + -- Note that this is for compression grouping only -- it doesn't need to be + -- accurate at the time recompressTracked is run. Operations such as a + -- delete/undelete cycle may make it inaccurate. + bt_page integer not null, + + -- revision.rev_id + -- This may be zero for orphan or deleted text + -- Like bt_page, it does not need to be accurate when recompressTracked is run. + bt_rev_id integer not null, + + -- text.old_id + bt_text_id integer not null, + + -- The ES cluster + bt_cluster varbinary(255), + + -- The ES blob ID + bt_blob_id integer not null, + + -- The CGZ content hash, or null + bt_cgz_hash varbinary(255), + + -- The URL this blob is to be moved to + bt_new_url varbinary(255), + + -- True if the text table has been updated to point to bt_new_url + bt_moved bool not null default 0, + + -- Primary key + -- Note that text_id is not unique due to null edits (protection, move) + -- moveTextRow(), commit(), trackOrphanText() + PRIMARY KEY (bt_text_id, bt_rev_id), + + -- Sort by page for easy CGZ recompression + -- doAllPages(), doAllOrphans(), doPage(), finishIncompleteMoves() + KEY (bt_moved, bt_page, bt_text_id), + + -- Key for determining the revisions using a given blob + -- Not used by any scripts yet + KEY (bt_cluster, bt_blob_id, bt_cgz_hash) + +) /*$wgDBTableOptions*/; + +-- Tracking table for blob rows that aren't tracked by the text table +CREATE TABLE /*$wgDBprefix*/blob_orphans ( + bo_cluster varbinary(255), + bo_blob_id integer not null, + + PRIMARY KEY (bo_cluster, bo_blob_id) +) /*$wgDBTableOptions*/; + diff --git a/maintenance/storage/compressOld.inc b/maintenance/storage/compressOld.inc index 52b9c40b..fb8cc422 100644 --- a/maintenance/storage/compressOld.inc +++ b/maintenance/storage/compressOld.inc @@ -4,10 +4,6 @@ * @ingroup Maintenance ExternalStorage */ -/** */ -require_once( 'Revision.php' ); -require_once( 'ExternalStoreDB.php' ); - /** @todo document */ function compressOldPages( $start = 0, $extdb = '' ) { $fname = 'compressOldPages'; @@ -70,7 +66,7 @@ define( 'LS_INDIVIDUAL', 0 ); define( 'LS_CHUNKED', 1 ); /** @todo document */ -function compressWithConcat( $startId, $maxChunkSize, $maxChunkFactor, $factorThreshold, $beginDate, +function compressWithConcat( $startId, $maxChunkSize, $beginDate, $endDate, $extdb="", $maxPageId = false ) { $fname = 'compressWithConcat'; @@ -198,7 +194,7 @@ function compressWithConcat( $startId, $maxChunkSize, $maxChunkFactor, $factorTh $primaryOldid = $revs[$i]->rev_text_id; # Get the text of each revision and add it to the object - for ( $j = 0; $j < $thisChunkSize && $chunk->isHappy( $maxChunkFactor, $factorThreshold ); $j++ ) { + for ( $j = 0; $j < $thisChunkSize && $chunk->isHappy(); $j++ ) { $oldid = $revs[$i + $j]->rev_text_id; # Get text @@ -229,7 +225,7 @@ function compressWithConcat( $startId, $maxChunkSize, $maxChunkFactor, $factorTh $stub = false; print 'x'; } else { - $stub = $chunk->addItem( $text ); + $stub = new HistoryBlobStub( $chunk->addItem( $text ) ); $stub->setLocation( $primaryOldid ); $stub->setReferrer( $oldid ); print '.'; diff --git a/maintenance/storage/compressOld.php b/maintenance/storage/compressOld.php index dda765d7..6f8b48eb 100644 --- a/maintenance/storage/compressOld.php +++ b/maintenance/storage/compressOld.php @@ -18,8 +18,6 @@ * -b earliest date to check for uncompressed revisions * -e latest revision date to compress * -s the old_id to start from - * -f the maximum ratio of compressed chunk bytes to uncompressed avg. revision bytes - * -h is a minimum number of KB, where cuts in * --extdb store specified revisions in an external cluster (untested) * * @file @@ -40,8 +38,6 @@ $defaults = array( 't' => 'concat', 'c' => 20, 's' => 0, - 'f' => 5, - 'h' => 100, 'b' => '', 'e' => '', 'extdb' => '', @@ -62,7 +58,7 @@ if ( $options['extdb'] != '' ) { $success = true; if ( $options['t'] == 'concat' ) { - $success = compressWithConcat( $options['s'], $options['c'], $options['f'], $options['h'], $options['b'], + $success = compressWithConcat( $options['s'], $options['c'], $options['b'], $options['e'], $options['extdb'], $options['endid'] ); } else { compressOldPages( $options['s'], $options['extdb'] ); diff --git a/maintenance/storage/dumpRev.php b/maintenance/storage/dumpRev.php index 720eb958..c84d8aa5 100644 --- a/maintenance/storage/dumpRev.php +++ b/maintenance/storage/dumpRev.php @@ -6,13 +6,51 @@ require_once( dirname(__FILE__) . '/../commandLine.inc' ); +$wgDebugLogFile = '/dev/stdout'; + + $dbr = wfGetDB( DB_SLAVE ); -$row = $dbr->selectRow( 'text', array( 'old_flags', 'old_text' ), array( 'old_id' => $args[0] ) ); -$obj = unserialize( $row->old_text ); +$row = $dbr->selectRow( + array( 'text', 'revision' ), + array( 'old_flags', 'old_text' ), + array( 'old_id=rev_text_id', 'rev_id' => $args[0] ) +); +if ( !$row ) { + print "Row not found\n"; + exit; +} -if ( get_class( $obj ) == 'concatenatedgziphistoryblob' ) { - print_r( array_keys( $obj->mItems ) ); -} else { - var_dump( $obj ); +$flags = explode( ',', $row->old_flags ); +$text = $row->old_text; +if ( in_array( 'external', $flags ) ) { + print "External $text\n"; + if ( preg_match( '!^DB://(\w+)/(\w+)/(\w+)$!', $text, $m ) ) { + $es = ExternalStore::getStoreObject( 'DB' ); + $blob = $es->fetchBlob( $m[1], $m[2], $m[3] ); + if ( strtolower( get_class( $blob ) ) == 'concatenatedgziphistoryblob' ) { + print "Found external CGZ\n"; + $blob->uncompress(); + print "Items: (" . implode( ', ', array_keys( $blob->mItems ) ) . ")\n"; + $text = $blob->getItem( $m[3] ); + } else { + print "CGZ expected at $text, got " . gettype( $blob ) . "\n"; + $text = $blob; + } + } else { + print "External plain $text\n"; + $text = ExternalStore::fetchFromURL( $text ); + } +} +if ( in_array( 'gzip', $flags ) ) { + $text = gzinflate( $text ); +} +if ( in_array( 'object', $flags ) ) { + $text = unserialize( $text ); } +if ( is_object( $text ) ) { + print "Unexpectedly got object of type: " . get_class( $text ) . "\n"; +} else { + print "Text length: " . strlen( $text ) ."\n"; + print substr( $text, 0, 100 ) . "\n"; +} diff --git a/maintenance/storage/orphanStats.php b/maintenance/storage/orphanStats.php new file mode 100644 index 00000000..afea815e --- /dev/null +++ b/maintenance/storage/orphanStats.php @@ -0,0 +1,46 @@ +execute(); + +class OrphanStats { + function getDB( $cluster ) { + $lb = wfGetLBFactory()->getExternalLB( $cluster ); + return $lb->getConnection( DB_SLAVE ); + } + + function execute() { + $extDBs = array(); + $dbr = wfGetDB( DB_SLAVE ); + $res = $dbr->select( 'blob_orphans', '*', false, __METHOD__ ); + + $num = 0; + $totalSize = 0; + $hashes = array(); + $maxSize = 0; + + foreach ( $res as $boRow ) { + $extDB = $this->getDB( $boRow->bo_cluster ); + $blobRow = $extDB->selectRow( 'blobs', '*', array( 'blob_id' => $boRow->bo_blob_id ), __METHOD__ ); + + $num++; + $size = strlen( $blobRow->blob_text ); + $totalSize += $size; + $hashes[ sha1( $blobRow->blob_text ) ] = true; + $maxSize = max( $size, $maxSize ); + } + unset( $res ); + + echo "Number of orphans: $num\n"; + if ( $num > 0 ) { + echo "Average size: " . round( $totalSize / $num, 0 ) . " bytes\n" . + "Max size: $maxSize\n" . + "Number of unique texts: " . count( $hashes ) . "\n"; + } + } +} diff --git a/maintenance/storage/recompressTracked.php b/maintenance/storage/recompressTracked.php new file mode 100644 index 00000000..7e4ed1b4 --- /dev/null +++ b/maintenance/storage/recompressTracked.php @@ -0,0 +1,742 @@ + [... ...] +Moves blobs indexed by trackBlobs.php to a specified list of destination clusters, and recompresses them in the process. Restartable. + +Options: + --procs Set the number of child processes (default 1) + --copy-only Copy only, do not update the text table. Restart without this option to complete. + --debug-log Log debugging data to the specified file + --info-log Log progress messages to the specified file + --critical-log Log error messages to the specified file +"; + exit( 1 ); +} + +$job = RecompressTracked::newFromCommandLine( $args, $options ); +$job->execute(); + +class RecompressTracked { + var $destClusters; + var $batchSize = 1000; + var $orphanBatchSize = 1000; + var $reportingInterval = 10; + var $numProcs = 1; + var $useDiff, $pageBlobClass, $orphanBlobClass; + var $slavePipes, $slaveProcs, $prevSlaveId; + var $copyOnly = false; + var $isChild = false; + var $slaveId = false; + var $debugLog, $infoLog, $criticalLog; + var $store; + + static $optionsWithArgs = array( 'procs', 'slave-id', 'debug-log', 'info-log', 'critical-log' ); + static $cmdLineOptionMap = array( + 'procs' => 'numProcs', + 'copy-only' => 'copyOnly', + 'child' => 'isChild', + 'slave-id' => 'slaveId', + 'debug-log' => 'debugLog', + 'info-log' => 'infoLog', + 'critical-log' => 'criticalLog', + ); + + static function getOptionsWithArgs() { + return self::$optionsWithArgs; + } + + static function newFromCommandLine( $args, $options ) { + $jobOptions = array( 'destClusters' => $args ); + foreach ( self::$cmdLineOptionMap as $cmdOption => $classOption ) { + if ( isset( $options[$cmdOption] ) ) { + $jobOptions[$classOption] = $options[$cmdOption]; + } + } + return new self( $jobOptions ); + } + + function __construct( $options ) { + foreach ( $options as $name => $value ) { + $this->$name = $value; + } + $this->store = new ExternalStoreDB; + if ( !$this->isChild ) { + $GLOBALS['wgDebugLogPrefix'] = "RCT M: "; + } elseif ( $this->slaveId !== false ) { + $GLOBALS['wgDebugLogPrefix'] = "RCT {$this->slaveId}: "; + } + $this->useDiff = function_exists( 'xdiff_string_bdiff' ); + $this->pageBlobClass = $this->useDiff ? 'DiffHistoryBlob' : 'ConcatenatedGzipHistoryBlob'; + $this->orphanBlobClass = 'ConcatenatedGzipHistoryBlob'; + } + + function debug( $msg ) { + wfDebug( "$msg\n" ); + if ( $this->debugLog ) { + $this->logToFile( $msg, $this->debugLog ); + } + + } + + function info( $msg ) { + echo "$msg\n"; + if ( $this->infoLog ) { + $this->logToFile( $msg, $this->infoLog ); + } + } + + function critical( $msg ) { + echo "$msg\n"; + if ( $this->criticalLog ) { + $this->logToFile( $msg, $this->criticalLog ); + } + } + + function logToFile( $msg, $file ) { + $header = '[' . date('d\TH:i:s') . '] ' . wfHostname() . ' ' . posix_getpid(); + if ( $this->slaveId !== false ) { + $header .= "({$this->slaveId})"; + } + $header .= ' ' . wfWikiID(); + wfErrorLog( sprintf( "%-50s %s\n", $header, $msg ), $file ); + } + + /** + * Wait until the selected slave has caught up to the master. + * This allows us to use the slave for things that were committed in a + * previous part of this batch process. + */ + function syncDBs() { + $dbw = wfGetDB( DB_MASTER ); + $dbr = wfGetDB( DB_SLAVE ); + $pos = $dbw->getMasterPos(); + $dbr->masterPosWait( $pos, 100000 ); + } + + /** + * Execute parent or child depending on the isChild option + */ + function execute() { + if ( $this->isChild ) { + $this->executeChild(); + } else { + $this->executeParent(); + } + } + + /** + * Execute the parent process + */ + function executeParent() { + if ( !$this->checkTrackingTable() ) { + return; + } + + $this->syncDBs(); + $this->startSlaveProcs(); + $this->doAllPages(); + $this->doAllOrphans(); + $this->killSlaveProcs(); + } + + /** + * Make sure the tracking table exists and isn't empty + */ + function checkTrackingTable() { + $dbr = wfGetDB( DB_SLAVE ); + if ( !$dbr->tableExists( 'blob_tracking' ) ) { + $this->critical( "Error: blob_tracking table does not exist" ); + return false; + } + $row = $dbr->selectRow( 'blob_tracking', '*', false, __METHOD__ ); + if ( !$row ) { + $this->info( "Warning: blob_tracking table contains no rows, skipping this wiki." ); + return false; + } + return true; + } + + /** + * Start the worker processes. + * These processes will listen on stdin for commands. + * This necessary because text recompression is slow: loading, compressing and + * writing are all slow. + */ + function startSlaveProcs() { + $cmd = 'php ' . wfEscapeShellArg( __FILE__ ); + foreach ( self::$cmdLineOptionMap as $cmdOption => $classOption ) { + if ( $cmdOption == 'slave-id' ) { + continue; + } elseif ( in_array( $cmdOption, self::$optionsWithArgs ) && isset( $this->$classOption ) ) { + $cmd .= " --$cmdOption " . wfEscapeShellArg( $this->$classOption ); + } elseif ( $this->$classOption ) { + $cmd .= " --$cmdOption"; + } + } + $cmd .= ' --child' . + ' --wiki ' . wfEscapeShellArg( wfWikiID() ) . + ' ' . call_user_func_array( 'wfEscapeShellArg', $this->destClusters ); + + $this->slavePipes = $this->slaveProcs = array(); + for ( $i = 0; $i < $this->numProcs; $i++ ) { + $pipes = false; + $spec = array( + array( 'pipe', 'r' ), + array( 'file', 'php://stdout', 'w' ), + array( 'file', 'php://stderr', 'w' ) + ); + wfSuppressWarnings(); + $proc = proc_open( "$cmd --slave-id $i", $spec, $pipes ); + wfRestoreWarnings(); + if ( !$proc ) { + $this->critical( "Error opening slave process: $cmd" ); + exit( 1 ); + } + $this->slaveProcs[$i] = $proc; + $this->slavePipes[$i] = $pipes[0]; + } + $this->prevSlaveId = -1; + } + + /** + * Gracefully terminate the child processes + */ + function killSlaveProcs() { + $this->info( "Waiting for slave processes to finish..." ); + for ( $i = 0; $i < $this->numProcs; $i++ ) { + $this->dispatchToSlave( $i, 'quit' ); + } + for ( $i = 0; $i < $this->numProcs; $i++ ) { + $status = proc_close( $this->slaveProcs[$i] ); + if ( $status ) { + $this->critical( "Warning: child #$i exited with status $status" ); + } + } + $this->info( "Done." ); + } + + /** + * Dispatch a command to the next available slave. + * This may block until a slave finishes its work and becomes available. + */ + function dispatch( /*...*/ ) { + $args = func_get_args(); + $pipes = $this->slavePipes; + $numPipes = stream_select( $x=array(), $pipes, $y=array(), 3600 ); + if ( !$numPipes ) { + $this->critical( "Error waiting to write to slaves. Aborting" ); + exit( 1 ); + } + for ( $i = 0; $i < $this->numProcs; $i++ ) { + $slaveId = ( $i + $this->prevSlaveId + 1 ) % $this->numProcs; + if ( isset( $pipes[$slaveId] ) ) { + $this->prevSlaveId = $slaveId; + $this->dispatchToSlave( $slaveId, $args ); + return; + } + } + $this->critical( "Unreachable" ); + exit( 1 ); + } + + /** + * Dispatch a command to a specified slave + */ + function dispatchToSlave( $slaveId, $args ) { + $args = (array)$args; + $cmd = implode( ' ', $args ); + fwrite( $this->slavePipes[$slaveId], "$cmd\n" ); + } + + /** + * Move all tracked pages to the new clusters + */ + function doAllPages() { + $dbr = wfGetDB( DB_SLAVE ); + $i = 0; + $startId = 0; + $numPages = $dbr->selectField( 'blob_tracking', + 'COUNT(DISTINCT bt_page)', + # A condition is required so that this query uses the index + array( 'bt_moved' => 0 ), + __METHOD__ + ); + if ( $this->copyOnly ) { + $this->info( "Copying pages..." ); + } else { + $this->info( "Moving pages..." ); + } + while ( true ) { + $res = $dbr->select( 'blob_tracking', + array( 'bt_page' ), + array( + 'bt_moved' => 0, + 'bt_page > ' . $dbr->addQuotes( $startId ) + ), + __METHOD__, + array( + 'DISTINCT', + 'ORDER BY' => 'bt_page', + 'LIMIT' => $this->batchSize, + ) + ); + if ( !$res->numRows() ) { + break; + } + foreach ( $res as $row ) { + $this->dispatch( 'doPage', $row->bt_page ); + $i++; + } + $startId = $row->bt_page; + $this->report( 'pages', $i, $numPages ); + } + $this->report( 'pages', $i, $numPages ); + if ( $this->copyOnly ) { + $this->info( "All page copies queued." ); + } else { + $this->info( "All page moves queued." ); + } + } + + /** + * Display a progress report + */ + function report( $label, $current, $end ) { + $this->numBatches++; + if ( $current == $end || $this->numBatches >= $this->reportingInterval ) { + $this->numBatches = 0; + $this->info( "$label: $current / $end" ); + wfWaitForSlaves( 5 ); + } + } + + /** + * Move all orphan text to the new clusters + */ + function doAllOrphans() { + $dbr = wfGetDB( DB_SLAVE ); + $startId = 0; + $i = 0; + $numOrphans = $dbr->selectField( 'blob_tracking', + 'COUNT(DISTINCT bt_text_id)', + array( 'bt_moved' => 0, 'bt_page' => 0 ), + __METHOD__ ); + if ( !$numOrphans ) { + return; + } + if ( $this->copyOnly ) { + $this->info( "Copying orphans..." ); + } else { + $this->info( "Moving orphans..." ); + } + $ids = array(); + + while ( true ) { + $res = $dbr->select( 'blob_tracking', + array( 'bt_text_id' ), + array( + 'bt_moved' => 0, + 'bt_page' => 0, + 'bt_text_id > ' . $dbr->addQuotes( $startId ) + ), + __METHOD__, + array( + 'DISTINCT', + 'ORDER BY' => 'bt_text_id', + 'LIMIT' => $this->batchSize + ) + ); + if ( !$res->numRows() ) { + break; + } + foreach ( $res as $row ) { + $ids[] = $row->bt_text_id; + $i++; + } + // Need to send enough orphan IDs to the child at a time to fill a blob, + // so orphanBatchSize needs to be at least ~100. + // batchSize can be smaller or larger. + while ( count( $ids ) > $this->orphanBatchSize ) { + $args = array_slice( $ids, 0, $this->orphanBatchSize ); + $ids = array_slice( $ids, $this->orphanBatchSize ); + array_unshift( $args, 'doOrphanList' ); + call_user_func_array( array( $this, 'dispatch' ), $args ); + } + $startId = $row->bt_text_id; + $this->report( 'orphans', $i, $numOrphans ); + } + $this->report( 'orphans', $i, $numOrphans ); + $this->info( "All orphans queued." ); + } + + /** + * Main entry point for worker processes + */ + function executeChild() { + $this->debug( 'starting' ); + $this->syncDBs(); + + while ( !feof( STDIN ) ) { + $line = rtrim( fgets( STDIN ) ); + if ( $line == '' ) { + continue; + } + $this->debug( $line ); + $args = explode( ' ', $line ); + $cmd = array_shift( $args ); + switch ( $cmd ) { + case 'doPage': + $this->doPage( intval( $args[0] ) ); + break; + case 'doOrphanList': + $this->doOrphanList( array_map( 'intval', $args ) ); + break; + case 'quit': + return; + } + wfWaitForSlaves( 5 ); + } + } + + /** + * Move tracked text in a given page + */ + function doPage( $pageId ) { + $title = Title::newFromId( $pageId ); + if ( $title ) { + $titleText = $title->getPrefixedText(); + } else { + $titleText = '[deleted]'; + } + $dbr = wfGetDB( DB_SLAVE ); + + // Finish any incomplete transactions + if ( !$this->copyOnly ) { + $this->finishIncompleteMoves( array( 'bt_page' => $pageId ) ); + $this->syncDBs(); + } + + $startId = 0; + $trx = new CgzCopyTransaction( $this, $this->pageBlobClass ); + + while ( true ) { + $res = $dbr->select( + array( 'blob_tracking', 'text' ), + '*', + array( + 'bt_page' => $pageId, + 'bt_text_id > ' . $dbr->addQuotes( $startId ), + 'bt_moved' => 0, + 'bt_new_url IS NULL', + 'bt_text_id=old_id', + ), + __METHOD__, + array( + 'ORDER BY' => 'bt_text_id', + 'LIMIT' => $this->batchSize + ) + ); + if ( !$res->numRows() ) { + break; + } + + $lastTextId = 0; + foreach ( $res as $row ) { + if ( $lastTextId == $row->bt_text_id ) { + // Duplicate (null edit) + continue; + } + $lastTextId = $row->bt_text_id; + // Load the text + $text = Revision::getRevisionText( $row ); + if ( $text === false ) { + $this->critical( "Error loading {$row->bt_rev_id}/{$row->bt_text_id}" ); + continue; + } + + // Queue it + if ( !$trx->addItem( $text, $row->bt_text_id ) ) { + $this->debug( "$titleText: committing blob with " . $trx->getSize() . " items" ); + $trx->commit(); + $trx = new CgzCopyTransaction( $this, $this->pageBlobClass ); + } + } + $startId = $row->bt_text_id; + } + + $this->debug( "$titleText: committing blob with " . $trx->getSize() . " items" ); + $trx->commit(); + } + + /** + * Atomic move operation. + * + * Write the new URL to the text table and set the bt_moved flag. + * + * This is done in a single transaction to provide restartable behaviour + * without data loss. + * + * The transaction is kept short to reduce locking. + */ + function moveTextRow( $textId, $url ) { + if ( $this->copyOnly ) { + $this->critical( "Internal error: can't call moveTextRow() in --copy-only mode" ); + exit( 1 ); + } + $dbw = wfGetDB( DB_MASTER ); + $dbw->begin(); + $dbw->update( 'text', + array( // set + 'old_text' => $url, + 'old_flags' => 'external,utf-8', + ), + array( // where + 'old_id' => $textId + ), + __METHOD__ + ); + $dbw->update( 'blob_tracking', + array( 'bt_moved' => 1 ), + array( 'bt_text_id' => $textId ), + __METHOD__ + ); + $dbw->commit(); + } + + /** + * Moves are done in two phases: bt_new_url and then bt_moved. + * - bt_new_url indicates that the text has been copied to the new cluster. + * - bt_moved indicates that the text table has been updated. + * + * This function completes any moves that only have done bt_new_url. This + * can happen when the script is interrupted, or when --copy-only is used. + */ + function finishIncompleteMoves( $conds ) { + $dbr = wfGetDB( DB_SLAVE ); + + $startId = 0; + $conds = array_merge( $conds, array( + 'bt_moved' => 0, + 'bt_new_url IS NOT NULL' + )); + while ( true ) { + $res = $dbr->select( 'blob_tracking', + '*', + array_merge( $conds, array( 'bt_text_id > ' . $dbr->addQuotes( $startId ) ) ), + __METHOD__, + array( + 'ORDER BY' => 'bt_text_id', + 'LIMIT' => $this->batchSize, + ) + ); + if ( !$res->numRows() ) { + break; + } + $this->debug( 'Incomplete: ' . $res->numRows() . ' rows' ); + foreach ( $res as $row ) { + $this->moveTextRow( $row->bt_text_id, $row->bt_new_url ); + } + $startId = $row->bt_text_id; + } + } + + /** + * Returns the name of the next target cluster + */ + function getTargetCluster() { + $cluster = next( $this->destClusters ); + if ( $cluster === false ) { + $cluster = reset( $this->destClusters ); + } + return $cluster; + } + + /** + * Gets a DB master connection for the given external cluster name + */ + function getExtDB( $cluster ) { + $lb = wfGetLBFactory()->getExternalLB( $cluster ); + return $lb->getConnection( DB_MASTER ); + } + + /** + * Move an orphan text_id to the new cluster + */ + function doOrphanList( $textIds ) { + // Finish incomplete moves + if ( !$this->copyOnly ) { + $this->finishIncompleteMoves( array( 'bt_text_id' => $textIds ) ); + $this->syncDBs(); + } + + $trx = new CgzCopyTransaction( $this, $this->orphanBlobClass ); + + $res = wfGetDB( DB_SLAVE )->select( + array( 'text', 'blob_tracking' ), + array( 'old_id', 'old_text', 'old_flags' ), + array( + 'old_id' => $textIds, + 'bt_text_id=old_id', + 'bt_moved' => 0, + ), + __METHOD__, + array( 'DISTINCT' ) + ); + + foreach ( $res as $row ) { + $text = Revision::getRevisionText( $row ); + if ( $text === false ) { + $this->critical( "Error: cannot load revision text for old_id=$textId" ); + continue; + } + + if ( !$trx->addItem( $text, $row->old_id ) ) { + $this->debug( "[orphan]: committing blob with " . $trx->getSize() . " rows" ); + $trx->commit(); + $trx = new CgzCopyTransaction( $this, $this->orphanBlobClass ); + } + } + $this->debug( "[orphan]: committing blob with " . $trx->getSize() . " rows" ); + $trx->commit(); + } +} + +/** + * Class to represent a recompression operation for a single CGZ blob + */ +class CgzCopyTransaction { + var $parent; + var $blobClass; + var $cgz; + var $referrers; + + /** + * Create a transaction from a RecompressTracked object + */ + function __construct( $parent, $blobClass ) { + $this->blobClass = $blobClass; + $this->cgz = false; + $this->texts = array(); + $this->parent = $parent; + } + + /** + * Add text. + * Returns false if it's ready to commit. + */ + function addItem( $text, $textId ) { + if ( !$this->cgz ) { + $class = $this->blobClass; + $this->cgz = new $class; + } + $hash = $this->cgz->addItem( $text ); + $this->referrers[$textId] = $hash; + $this->texts[$textId] = $text; + return $this->cgz->isHappy(); + } + + function getSize() { + return count( $this->texts ); + } + + /** + * Recompress text after some aberrant modification + */ + function recompress() { + $class = $this->blobClass; + $this->cgz = new $class; + $this->referrers = array(); + foreach ( $this->texts as $textId => $text ) { + $hash = $this->cgz->addItem( $text ); + $this->referrers[$textId] = $hash; + } + } + + /** + * Commit the blob. + * Does nothing if no text items have been added. + * May skip the move if --copy-only is set. + */ + function commit() { + $originalCount = count( $this->texts ); + if ( !$originalCount ) { + return; + } + + // Check to see if the target text_ids have been moved already. + // + // We originally read from the slave, so this can happen when a single + // text_id is shared between multiple pages. It's rare, but possible + // if a delete/move/undelete cycle splits up a null edit. + // + // We do a locking read to prevent closer-run race conditions. + $dbw = wfGetDB( DB_MASTER ); + $dbw->begin(); + $res = $dbw->select( 'blob_tracking', + array( 'bt_text_id', 'bt_moved' ), + array( 'bt_text_id' => array_keys( $this->referrers ) ), + __METHOD__, array( 'FOR UPDATE' ) ); + $dirty = false; + foreach ( $res as $row ) { + if ( $row->bt_moved ) { + # This row has already been moved, remove it + $this->parent->debug( "TRX: conflict detected in old_id={$row->bt_text_id}" ); + unset( $this->texts[$row->bt_text_id] ); + $dirty = true; + } + } + + // Recompress the blob if necessary + if ( $dirty ) { + if ( !count( $this->texts ) ) { + // All have been moved already + if ( $originalCount > 1 ) { + // This is suspcious, make noise + $this->critical( "Warning: concurrent operation detected, are there two conflicting " . + "processes running, doing the same job?" ); + } + return; + } + $this->recompress(); + } + + // Insert the data into the destination cluster + $targetCluster = $this->parent->getTargetCluster(); + $store = $this->parent->store; + $targetDB = $store->getMaster( $targetCluster ); + $targetDB->clearFlag( DBO_TRX ); // we manage the transactions + $targetDB->begin(); + $baseUrl = $this->parent->store->store( $targetCluster, serialize( $this->cgz ) ); + + // Write the new URLs to the blob_tracking table + foreach ( $this->referrers as $textId => $hash ) { + $url = $baseUrl . '/' . $hash; + $dbw->update( 'blob_tracking', + array( 'bt_new_url' => $url ), + array( + 'bt_text_id' => $textId, + 'bt_moved' => 0, # Check for concurrent conflicting update + ), + __METHOD__ + ); + } + + $targetDB->commit(); + // Critical section here: interruption at this point causes blob duplication + // Reversing the order of the commits would cause data loss instead + $dbw->commit(); + + // Write the new URLs to the text table and set the moved flag + if ( !$this->parent->copyOnly ) { + foreach ( $this->referrers as $textId => $hash ) { + $url = $baseUrl . '/' . $hash; + $this->parent->moveTextRow( $textId, $url ); + } + } + } +} + diff --git a/maintenance/storage/testCompression.php b/maintenance/storage/testCompression.php new file mode 100644 index 00000000..9c96c9f8 --- /dev/null +++ b/maintenance/storage/testCompression.php @@ -0,0 +1,81 @@ +] [--start=] [--limit=] \n"; + exit( 1 ); +} + +$title = Title::newFromText( $args[0] ); +if ( isset( $options['start'] ) ) { + $start = wfTimestamp( TS_MW, strtotime( $options['start'] ) ); + echo "Starting from " . $wgLang->timeanddate( $start ) . "\n"; +} else { + $start = '19700101000000'; +} +if ( isset( $options['limit'] ) ) { + $limit = $options['limit']; + $untilHappy = false; +} else { + $limit = 1000; + $untilHappy = true; +} +$type = isset( $options['type'] ) ? $options['type'] : 'ConcatenatedGzipHistoryBlob'; + + +$dbr = wfGetDB( DB_SLAVE ); +$res = $dbr->select( + array( 'page', 'revision', 'text' ), + '*', + array( + 'page_namespace' => $title->getNamespace(), + 'page_title' => $title->getDBkey(), + 'page_id=rev_page', + 'rev_timestamp > ' . $dbr->addQuotes( $dbr->timestamp( $start ) ), + 'rev_text_id=old_id' + ), __FILE__, array( 'LIMIT' => $limit ) +); + +$blob = new $type; +$hashes = array(); +$keys = array(); +$uncompressedSize = 0; +$t = -microtime( true ); +foreach ( $res as $row ) { + $revision = new Revision( $row ); + $text = $revision->getText(); + $uncompressedSize += strlen( $text ); + $hashes[$row->rev_id] = md5( $text ); + $keys[$row->rev_id] = $blob->addItem( $text ); + if ( $untilHappy && !$blob->isHappy() ) { + break; + } +} + +$serialized = serialize( $blob ); +$t += microtime( true ); +#print_r( $blob->mDiffMap ); + +printf( "%s\nCompression ratio for %d revisions: %5.2f, %s -> %d\n", + $type, + count( $hashes ), + $uncompressedSize / strlen( $serialized ), + $wgLang->formatSize( $uncompressedSize ), + strlen( $serialized ) +); +printf( "Compression time: %5.2f ms\n", $t * 1000 ); + +$t = -microtime( true ); +$blob = unserialize( $serialized ); +foreach ( $keys as $id => $key ) { + $text = $blob->getItem( $key ); + if ( md5( $text ) != $hashes[$id] ) { + echo "Content hash mismatch for rev_id $id\n"; + #var_dump( $text ); + } +} +$t += microtime( true ); +printf( "Decompression time: %5.2f ms\n", $t * 1000 ); + diff --git a/maintenance/storage/trackBlobs.php b/maintenance/storage/trackBlobs.php new file mode 100644 index 00000000..b13faa00 --- /dev/null +++ b/maintenance/storage/trackBlobs.php @@ -0,0 +1,316 @@ + [... ]\n"; + echo "Adds blobs from a given ES cluster to the blob_tracking table\n"; + echo "Automatically deletes the tracking table and starts from the start again when restarted.\n"; + + exit( 1 ); +} +$tracker = new TrackBlobs( $args ); +$tracker->trackBlobs(); + +class TrackBlobs { + var $clusters, $textClause; + var $doBlobOrphans; + var $trackedBlobs = array(); + + var $batchSize = 1000; + var $reportingInterval = 10; + + function __construct( $clusters ) { + $this->clusters = $clusters; + if ( extension_loaded( 'gmp' ) ) { + $this->doBlobOrphans = true; + foreach ( $clusters as $cluster ) { + $this->trackedBlobs[$cluster] = gmp_init( 0 ); + } + } else { + echo "Warning: the gmp extension is needed to find orphan blobs\n"; + } + } + + function trackBlobs() { + $this->initTrackingTable(); + $this->trackRevisions(); + $this->trackOrphanText(); + if ( $this->doBlobOrphans ) { + $this->findOrphanBlobs(); + } + } + + function initTrackingTable() { + $dbw = wfGetDB( DB_MASTER ); + if ( $dbw->tableExists( 'blob_tracking' ) ) { + $dbw->query( 'DROP TABLE ' . $dbw->tableName( 'blob_tracking' ) ); + $dbw->query( 'DROP TABLE ' . $dbw->tableName( 'blob_orphans' ) ); + } + $dbw->sourceFile( dirname( __FILE__ ) . '/blob_tracking.sql' ); + } + + function getTextClause() { + if ( !$this->textClause ) { + $dbr = wfGetDB( DB_SLAVE ); + $this->textClause = ''; + foreach ( $this->clusters as $cluster ) { + if ( $this->textClause != '' ) { + $this->textClause .= ' OR '; + } + $this->textClause .= 'old_text LIKE ' . $dbr->addQuotes( $dbr->escapeLike( "DB://$cluster/" ) . '%' ); + } + } + return $this->textClause; + } + + function interpretPointer( $text ) { + if ( !preg_match( '!^DB://(\w+)/(\d+)(?:/([0-9a-fA-F]+)|)$!', $text, $m ) ) { + return false; + } + return array( + 'cluster' => $m[1], + 'id' => intval( $m[2] ), + 'hash' => isset( $m[3] ) ? $m[2] : null + ); + } + + /** + * Scan the revision table for rows stored in the specified clusters + */ + function trackRevisions() { + $dbw = wfGetDB( DB_MASTER ); + $dbr = wfGetDB( DB_SLAVE ); + + $textClause = $this->getTextClause(); + $startId = 0; + $endId = $dbr->selectField( 'revision', 'MAX(rev_id)', false, __METHOD__ ); + $batchesDone = 0; + $rowsInserted = 0; + + echo "Finding revisions...\n"; + + while ( true ) { + $res = $dbr->select( array( 'revision', 'text' ), + array( 'rev_id', 'rev_page', 'old_id', 'old_flags', 'old_text' ), + array( + 'rev_id > ' . $dbr->addQuotes( $startId ), + 'rev_text_id=old_id', + $textClause, + "old_flags LIKE '%external%'", + ), + __METHOD__, + array( + 'ORDER BY' => 'rev_id', + 'LIMIT' => $this->batchSize + ) + ); + if ( !$res->numRows() ) { + break; + } + + $insertBatch = array(); + foreach ( $res as $row ) { + $startId = $row->rev_id; + $info = $this->interpretPointer( $row->old_text ); + if ( !$info ) { + echo "Invalid DB:// URL in rev_id {$row->rev_id}\n"; + continue; + } + if ( !in_array( $info['cluster'], $this->clusters ) ) { + echo "Invalid cluster returned in SQL query: {$info['cluster']}\n"; + continue; + } + $insertBatch[] = array( + 'bt_page' => $row->rev_page, + 'bt_rev_id' => $row->rev_id, + 'bt_text_id' => $row->old_id, + 'bt_cluster' => $info['cluster'], + 'bt_blob_id' => $info['id'], + 'bt_cgz_hash' => $info['hash'] + ); + if ( $this->doBlobOrphans ) { + gmp_setbit( $this->trackedBlobs[$info['cluster']], $info['id'] ); + } + } + $dbw->insert( 'blob_tracking', $insertBatch, __METHOD__ ); + $rowsInserted += count( $insertBatch ); + + ++$batchesDone; + if ( $batchesDone >= $this->reportingInterval ) { + $batchesDone = 0; + echo "$startId / $endId\n"; + wfWaitForSlaves( 5 ); + } + } + echo "Found $rowsInserted revisions\n"; + } + + /** + * Scan the text table for orphan text + * Orphan text here does not imply DB corruption -- deleted text tracked by the + * archive table counts as orphan for our purposes. + */ + function trackOrphanText() { + # Wait until the blob_tracking table is available in the slave + $dbw = wfGetDB( DB_MASTER ); + $dbr = wfGetDB( DB_SLAVE ); + $pos = $dbw->getMasterPos(); + $dbr->masterPosWait( $pos, 100000 ); + + $textClause = $this->getTextClause( $this->clusters ); + $startId = 0; + $endId = $dbr->selectField( 'text', 'MAX(old_id)', false, __METHOD__ ); + $rowsInserted = 0; + $batchesDone = 0; + + echo "Finding orphan text...\n"; + + # Scan the text table for orphan text + while ( true ) { + $res = $dbr->select( array( 'text', 'blob_tracking' ), + array( 'old_id', 'old_flags', 'old_text' ), + array( + 'old_id>' . $dbr->addQuotes( $startId ), + $textClause, + "old_flags LIKE '%external%'", + 'bt_text_id IS NULL' + ), + __METHOD__, + array( + 'ORDER BY' => 'old_id', + 'LIMIT' => $this->batchSize + ), + array( 'blob_tracking' => array( 'LEFT JOIN', 'bt_text_id=old_id' ) ) + ); + $ids = array(); + foreach ( $res as $row ) { + $ids[] = $row->old_id; + } + + if ( !$res->numRows() ) { + break; + } + + $insertBatch = array(); + foreach ( $res as $row ) { + $startId = $row->old_id; + $info = $this->interpretPointer( $row->old_text ); + if ( !$info ) { + echo "Invalid DB:// URL in old_id {$row->old_id}\n"; + continue; + } + if ( !in_array( $info['cluster'], $this->clusters ) ) { + echo "Invalid cluster returned in SQL query\n"; + continue; + } + + $insertBatch[] = array( + 'bt_page' => 0, + 'bt_rev_id' => 0, + 'bt_text_id' => $row->old_id, + 'bt_cluster' => $info['cluster'], + 'bt_blob_id' => $info['id'], + 'bt_cgz_hash' => $info['hash'] + ); + if ( $this->doBlobOrphans ) { + gmp_setbit( $this->trackedBlobs[$info['cluster']], $info['id'] ); + } + } + $dbw->insert( 'blob_tracking', $insertBatch, __METHOD__ ); + + $rowsInserted += count( $insertBatch ); + ++$batchesDone; + if ( $batchesDone >= $this->reportingInterval ) { + $batchesDone = 0; + echo "$startId / $endId\n"; + wfWaitForSlaves( 5 ); + } + } + echo "Found $rowsInserted orphan text rows\n"; + } + + /** + * Scan the blobs table for rows not registered in blob_tracking (and thus not + * registered in the text table). + * + * Orphan blobs are indicative of DB corruption. They are inaccessible and + * should probably be deleted. + */ + function findOrphanBlobs() { + if ( !extension_loaded( 'gmp' ) ) { + echo "Can't find orphan blobs, need bitfield support provided by GMP.\n"; + return; + } + + $dbw = wfGetDB( DB_MASTER ); + + foreach ( $this->clusters as $cluster ) { + echo "Searching for orphan blobs in $cluster...\n"; + $lb = wfGetLBFactory()->getExternalLB( $cluster ); + try { + $extDB = $lb->getConnection( DB_SLAVE ); + } catch ( DBConnectionError $e ) { + if ( strpos( $e->error, 'Unknown database' ) !== false ) { + echo "No database on $cluster\n"; + } else { + echo "Error on $cluster: " . $e->getMessage() . "\n"; + } + continue; + } + $startId = 0; + $batchesDone = 0; + $actualBlobs = gmp_init( 0 ); + $endId = $extDB->selectField( 'blobs', 'MAX(blob_id)', false, __METHOD__ ); + + // Build a bitmap of actual blob rows + while ( true ) { + $res = $extDB->select( 'blobs', + array( 'blob_id' ), + array( 'blob_id > ' . $extDB->addQuotes( $startId ) ), + __METHOD__, + array( 'LIMIT' => $this->batchSize, 'ORDER BY' => 'blob_id' ) + ); + + if ( !$res->numRows() ) { + break; + } + + foreach ( $res as $row ) { + gmp_setbit( $actualBlobs, $row->blob_id ); + } + $startId = $row->blob_id; + + ++$batchesDone; + if ( $batchesDone >= $this->reportingInterval ) { + $batchesDone = 0; + echo "$startId / $endId\n"; + } + } + + // Find actual blobs that weren't tracked by the previous passes + // This is a set-theoretic difference A \ B, or in bitwise terms, A & ~B + $orphans = gmp_and( $actualBlobs, gmp_com( $this->trackedBlobs[$cluster] ) ); + + // Traverse the orphan list + $insertBatch = array(); + $id = 0; + while ( true ) { + $id = gmp_scan1( $orphans, $id ); + if ( $id == -1 ) { + break; + } + $insertBatch[] = array( + 'bo_cluster' => $cluster, + 'bo_blob_id' => $id + ); + ++$id; + } + + // Insert the batch + echo "Found " . count( $insertBatch ) . " orphan(s) in $cluster\n"; + $dbw->insert( 'blob_orphans', $insertBatch, __METHOD__ ); + } + } +} -- cgit v1.2.2