summaryrefslogtreecommitdiff
path: root/vendor/nmred/kafka-php/src/Kafka/Protocol/Decoder.php
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/nmred/kafka-php/src/Kafka/Protocol/Decoder.php')
-rw-r--r--vendor/nmred/kafka-php/src/Kafka/Protocol/Decoder.php430
1 files changed, 430 insertions, 0 deletions
diff --git a/vendor/nmred/kafka-php/src/Kafka/Protocol/Decoder.php b/vendor/nmred/kafka-php/src/Kafka/Protocol/Decoder.php
new file mode 100644
index 00000000..f1e4b496
--- /dev/null
+++ b/vendor/nmred/kafka-php/src/Kafka/Protocol/Decoder.php
@@ -0,0 +1,430 @@
+<?php
+/* vim: set expandtab tabstop=4 shiftwidth=4 softtabstop=4 foldmethod=marker: */
+// +---------------------------------------------------------------------------
+// | SWAN [ $_SWANBR_SLOGAN_$ ]
+// +---------------------------------------------------------------------------
+// | Copyright $_SWANBR_COPYRIGHT_$
+// +---------------------------------------------------------------------------
+// | Version $_SWANBR_VERSION_$
+// +---------------------------------------------------------------------------
+// | Licensed ( $_SWANBR_LICENSED_URL_$ )
+// +---------------------------------------------------------------------------
+// | $_SWANBR_WEB_DOMAIN_$
+// +---------------------------------------------------------------------------
+
+namespace Kafka\Protocol;
+
+/**
++------------------------------------------------------------------------------
+* Kafka protocol since Kafka v0.8
++------------------------------------------------------------------------------
+*
+* @package
+* @version $_SWANBR_VERSION_$
+* @copyright Copyleft
+* @author $_SWANBR_AUTHOR_$
++------------------------------------------------------------------------------
+*/
+
+class Decoder extends Protocol
+{
+ // {{{ functions
+ // {{{ public function produceResponse()
+
+ /**
+ * decode produce response
+ *
+ * @param string $data
+ * @access public
+ * @return array
+ */
+ public function produceResponse()
+ {
+ $result = array();
+ $dataLen = self::unpack(self::BIT_B32, $this->stream->read(4, true));
+ $dataLen = array_shift($dataLen);
+ if (!$dataLen) {
+ throw new \Kafka\Exception\Protocol('produce response invalid.');
+ }
+ $data = $this->stream->read($dataLen, true);
+
+ // parse data struct
+ $offset = 4;
+ $topicCount = self::unpack(self::BIT_B32, substr($data, $offset, 4));
+ $topicCount = array_shift($topicCount);
+ $offset += 4;
+ for ($i = 0; $i < $topicCount; $i++) {
+ $topicLen = self::unpack(self::BIT_B16, substr($data, $offset, 2)); // int16 topic name length
+ $topicLen = isset($topicLen[1]) ? $topicLen[1] : 0;
+ $offset += 2;
+ $topicName = substr($data, $offset, $topicLen);
+ $offset += $topicLen;
+ $partitionCount = self::unpack(self::BIT_B32, substr($data, $offset, 4));
+ $partitionCount = isset($partitionCount[1]) ? $partitionCount[1] : 0;
+ $offset += 4;
+ $result[$topicName] = array();
+ for ($j = 0; $j < $partitionCount; $j++) {
+ $partitionId = self::unpack(self::BIT_B32, substr($data, $offset, 4));
+ $offset += 4;
+ $errCode = self::unpack(self::BIT_B16, substr($data, $offset, 2));
+ $offset += 2;
+ $partitionOffset = self::unpack(self::BIT_B64, substr($data, $offset, 8));
+ $offset += 8;
+ $result[$topicName][$partitionId[1]] = array(
+ 'errCode' => $errCode[1],
+ 'offset' => $partitionOffset
+ );
+ }
+ }
+
+ return $result;
+ }
+
+ // }}}
+ // {{{ public function fetchResponse()
+
+ /**
+ * decode fetch response
+ *
+ * @param string $data
+ * @access public
+ * @return Iterator
+ */
+ public function fetchResponse()
+ {
+ return new \Kafka\Protocol\Fetch\Topic($this->stream);
+ }
+
+ // }}}
+ // {{{ public function metadataResponse()
+
+ /**
+ * decode metadata response
+ *
+ * @param string $data
+ * @access public
+ * @return array
+ */
+ public function metadataResponse()
+ {
+ $result = array();
+ $broker = array();
+ $topic = array();
+ $dataLen = self::unpack(self::BIT_B32, $this->stream->read(4, true));
+ $dataLen = array_shift($dataLen);
+ if (!$dataLen) {
+ throw new \Kafka\Exception\Protocol('metaData response invalid.');
+ }
+ $data = $this->stream->read($dataLen, true);
+ $offset = 4;
+ $brokerCount = self::unpack(self::BIT_B32, substr($data, $offset, 4));
+ $offset += 4;
+ $brokerCount = isset($brokerCount[1]) ? $brokerCount[1] : 0;
+ for ($i = 0; $i < $brokerCount; $i++) {
+ $nodeId = self::unpack(self::BIT_B32, substr($data, $offset, 4));
+ $nodeId = $nodeId[1];
+ $offset += 4;
+ $hostNameLen = self::unpack(self::BIT_B16, substr($data, $offset, 2)); // int16 host name length
+ $hostNameLen = isset($hostNameLen[1]) ? $hostNameLen[1] : 0;
+ $offset += 2;
+ $hostName = substr($data, $offset, $hostNameLen);
+ $offset += $hostNameLen;
+ $port = self::unpack(self::BIT_B32, substr($data, $offset, 4));
+ $offset += 4;
+ $broker[$nodeId] = array(
+ 'host' => $hostName,
+ 'port' => $port[1],
+ );
+ }
+
+ $topicMetaCount = self::unpack(self::BIT_B32, substr($data, $offset, 4));
+ $offset += 4;
+ $topicMetaCount = isset($topicMetaCount[1]) ? $topicMetaCount[1] : 0;
+ for ($i = 0; $i < $topicMetaCount; $i++) {
+ $topicErrCode = self::unpack(self::BIT_B16, substr($data, $offset, 2));
+ $offset += 2;
+ $topicLen = self::unpack(self::BIT_B16, substr($data, $offset, 2));
+ $offset += 2;
+ $topicName = substr($data, $offset, $topicLen[1]);
+ $offset += $topicLen[1];
+ $partitionCount = self::unpack(self::BIT_B32, substr($data, $offset, 4));
+ $offset += 4;
+ $partitionCount = isset($partitionCount[1]) ? $partitionCount[1] : 0;
+ $topic[$topicName]['errCode'] = $topicErrCode[1];
+ $partitions = array();
+ for ($j = 0; $j < $partitionCount; $j++) {
+ $partitionErrCode = self::unpack(self::BIT_B16, substr($data, $offset, 2));
+ $offset += 2;
+ $partitionId = self::unpack(self::BIT_B32, substr($data, $offset, 4));
+ $partitionId = isset($partitionId[1]) ? $partitionId[1] : 0;
+ $offset += 4;
+ $leaderId = self::unpack(self::BIT_B32, substr($data, $offset, 4));
+ $offset += 4;
+ $repliasCount = self::unpack(self::BIT_B32, substr($data, $offset, 4));
+ $offset += 4;
+ $repliasCount = isset($repliasCount[1]) ? $repliasCount[1] : 0;
+ $replias = array();
+ for ($z = 0; $z < $repliasCount; $z++) {
+ $repliaId = self::unpack(self::BIT_B32, substr($data, $offset, 4));
+ $offset += 4;
+ $replias[] = $repliaId[1];
+ }
+ $isrCount = self::unpack(self::BIT_B32, substr($data, $offset, 4));
+ $offset += 4;
+ $isrCount = isset($isrCount[1]) ? $isrCount[1] : 0;
+ $isrs = array();
+ for ($z = 0; $z < $isrCount; $z++) {
+ $isrId = self::unpack(self::BIT_B32, substr($data, $offset, 4));
+ $offset += 4;
+ $isrs[] = $isrId[1];
+ }
+
+ $partitions[$partitionId] = array(
+ 'errCode' => $partitionErrCode[1],
+ 'leader' => $leaderId[1],
+ 'replicas' => $replias,
+ 'isr' => $isrs,
+ );
+ }
+ $topic[$topicName]['partitions'] = $partitions;
+ }
+
+ $result = array(
+ 'brokers' => $broker,
+ 'topics' => $topic,
+ );
+ return $result;
+ }
+
+ // }}}
+ // {{{ public function offsetResponse()
+
+ /**
+ * decode offset response
+ *
+ * @param string $data
+ * @access public
+ * @return array
+ */
+ public function offsetResponse()
+ {
+ $result = array();
+ $dataLen = self::unpack(self::BIT_B32, $this->stream->read(4, true));
+ $dataLen = array_shift($dataLen);
+ if (!$dataLen) {
+ throw new \Kafka\Exception\Protocol('offset response invalid.');
+ }
+ $data = $this->stream->read($dataLen, true);
+ $offset = 4;
+ $topicCount = self::unpack(self::BIT_B32, substr($data, $offset, 4));
+ $offset += 4;
+ $topicCount = array_shift($topicCount);
+ for ($i = 0; $i < $topicCount; $i++) {
+ $topicLen = self::unpack(self::BIT_B16, substr($data, $offset, 2)); // int16 topic name length
+ $topicLen = isset($topicLen[1]) ? $topicLen[1] : 0;
+ $offset += 2;
+ $topicName = substr($data, $offset, $topicLen);
+ $offset += $topicLen;
+ $partitionCount = self::unpack(self::BIT_B32, substr($data, $offset, 4));
+ $partitionCount = isset($partitionCount[1]) ? $partitionCount[1] : 0;
+ $offset += 4;
+ $result[$topicName] = array();
+ for ($j = 0; $j < $partitionCount; $j++) {
+ $partitionId = self::unpack(self::BIT_B32, substr($data, $offset, 4));
+ $offset += 4;
+ $errCode = self::unpack(self::BIT_B16, substr($data, $offset, 2));
+ $offset += 2;
+ $offsetCount = self::unpack(self::BIT_B32, substr($data, $offset, 4));
+ $offset += 4;
+ $offsetCount = array_shift($offsetCount);
+ $offsetArr = array();
+ for ($z = 0; $z < $offsetCount; $z++) {
+ $offsetArr[] = self::unpack(self::BIT_B64, substr($data, $offset, 8));
+ $offset += 8;
+ }
+ $result[$topicName][$partitionId[1]] = array(
+ 'errCode' => $errCode[1],
+ 'offset' => $offsetArr
+ );
+ }
+ }
+ return $result;
+ }
+
+ // }}}
+ // {{{ public function commitOffsetResponse()
+
+ /**
+ * decode commit offset response
+ *
+ * @param string $data
+ * @access public
+ * @return array
+ */
+ public function commitOffsetResponse()
+ {
+ $result = array();
+ $dataLen = self::unpack(self::BIT_B32, $this->stream->read(4, true));
+ $dataLen = array_shift($dataLen);
+ if (!$dataLen) {
+ throw new \Kafka\Exception\Protocol('commit offset response invalid.');
+ }
+ $data = $this->stream->read($dataLen, true);
+ $offset = 4;
+ $topicCount = self::unpack(self::BIT_B32, substr($data, $offset, 4));
+ $offset += 4;
+ $topicCount = array_shift($topicCount);
+ for ($i = 0; $i < $topicCount; $i++) {
+ $topicLen = self::unpack(self::BIT_B16, substr($data, $offset, 2)); // int16 topic name length
+ $topicLen = isset($topicLen[1]) ? $topicLen[1] : 0;
+ $offset += 2;
+ $topicName = substr($data, $offset, $topicLen);
+ $offset += $topicLen;
+ $partitionCount = self::unpack(self::BIT_B32, substr($data, $offset, 4));
+ $partitionCount = isset($partitionCount[1]) ? $partitionCount[1] : 0;
+ $offset += 4;
+ $result[$topicName] = array();
+ for ($j = 0; $j < $partitionCount; $j++) {
+ $partitionId = self::unpack(self::BIT_B32, substr($data, $offset, 4));
+ $offset += 4;
+ $errCode = self::unpack(self::BIT_B16, substr($data, $offset, 2));
+ $offset += 2;
+ $result[$topicName][$partitionId[1]] = array(
+ 'errCode' => $errCode[1],
+ );
+ }
+ }
+ return $result;
+ }
+
+ // }}}
+ // {{{ public function fetchOffsetResponse()
+
+ /**
+ * decode fetch offset response
+ *
+ * @param string $data
+ * @access public
+ * @return array
+ */
+ public function fetchOffsetResponse()
+ {
+ $result = array();
+ $dataLen = self::unpack(self::BIT_B32, $this->stream->read(4, true));
+ $dataLen = array_shift($dataLen);
+ if (!$dataLen) {
+ throw new \Kafka\Exception\Protocol('fetch offset response invalid.');
+ }
+ $data = $this->stream->read($dataLen, true);
+ $offset = 4;
+ $topicCount = self::unpack(self::BIT_B32, substr($data, $offset, 4));
+ $offset += 4;
+ $topicCount = array_shift($topicCount);
+ for ($i = 0; $i < $topicCount; $i++) {
+ $topicLen = self::unpack(self::BIT_B16, substr($data, $offset, 2)); // int16 topic name length
+ $topicLen = isset($topicLen[1]) ? $topicLen[1] : 0;
+ $offset += 2;
+ $topicName = substr($data, $offset, $topicLen);
+ $offset += $topicLen;
+ $partitionCount = self::unpack(self::BIT_B32, substr($data, $offset, 4));
+ $partitionCount = isset($partitionCount[1]) ? $partitionCount[1] : 0;
+ $offset += 4;
+ $result[$topicName] = array();
+ for ($j = 0; $j < $partitionCount; $j++) {
+ $partitionId = self::unpack(self::BIT_B32, substr($data, $offset, 4));
+ $offset += 4;
+ $partitionOffset = self::unpack(self::BIT_B64, substr($data, $offset, 8));
+ $offset += 8;
+ $metaLen = self::unpack(self::BIT_B16, substr($data, $offset, 2));
+ $metaLen = array_shift($metaLen);
+ $offset += 2;
+ $metaData = '';
+ if ($metaLen) {
+ $metaData = substr($data, $offset, $metaLen);
+ $offset += $metaLen;
+ }
+ $errCode = self::unpack(self::BIT_B16, substr($data, $offset, 2));
+ $offset += 2;
+ $result[$topicName][$partitionId[1]] = array(
+ 'offset' => $partitionOffset,
+ 'metadata' => $metaData,
+ 'errCode' => $errCode[1],
+ );
+ }
+ }
+ return $result;
+ }
+
+ // }}}
+ // {{{ public static function getError()
+
+ /**
+ * get error
+ *
+ * @param integer $errCode
+ * @static
+ * @access public
+ * @return string
+ */
+ public static function getError($errCode)
+ {
+ $error = '';
+ switch($errCode) {
+ case 0:
+ $error = 'No error--it worked!';
+ break;
+ case -1:
+ $error = 'An unexpected server error';
+ break;
+ case 1:
+ $error = 'The requested offset is outside the range of offsets maintained by the server for the given topic/partition.';
+ break;
+ case 2:
+ $error = 'This indicates that a message contents does not match its CRC';
+ break;
+ case 3:
+ $error = 'This request is for a topic or partition that does not exist on this broker.';
+ break;
+ case 4:
+ $error = 'The message has a negative size';
+ break;
+ case 5:
+ $error = 'This error is thrown if we are in the middle of a leadership election and there is currently no leader for this partition and hence it is unavailable for writes';
+ break;
+ case 6:
+ $error = 'This error is thrown if the client attempts to send messages to a replica that is not the leader for some partition. It indicates that the clients metadata is out of date.';
+ break;
+ case 7:
+ $error = 'This error is thrown if the request exceeds the user-specified time limit in the request.';
+ break;
+ case 8:
+ $error = 'This is not a client facing error and is used only internally by intra-cluster broker communication.';
+ break;
+ case 10:
+ $error = 'The server has a configurable maximum message size to avoid unbounded memory allocation. This error is thrown if the client attempt to produce a message larger than this maximum.';
+ break;
+ case 11:
+ $error = 'Internal error code for broker-to-broker communication.';
+ break;
+ case 12:
+ $error = 'If you specify a string larger than configured maximum for offset metadata';
+ break;
+ case 14:
+ $error = 'The broker returns this error code for an offset fetch request if it is still loading offsets (after a leader change for that offsets topic partition).';
+ break;
+ case 15:
+ $error = 'The broker returns this error code for consumer metadata requests or offset commit requests if the offsets topic has not yet been created.';
+ break;
+ case 16:
+ $error = 'The broker returns this error code if it receives an offset fetch or commit request for a consumer group that it is not a coordinator for.';
+ break;
+ default:
+ $error = 'Unknown error';
+ }
+
+ return $error;
+ }
+
+ // }}}
+ // }}}
+}