summaryrefslogtreecommitdiff
path: root/vendor/nmred/kafka-php/src/Kafka
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/nmred/kafka-php/src/Kafka')
-rw-r--r--vendor/nmred/kafka-php/src/Kafka/Client.php290
-rw-r--r--vendor/nmred/kafka-php/src/Kafka/ClusterMetaData.php53
-rw-r--r--vendor/nmred/kafka-php/src/Kafka/Consumer.php378
-rw-r--r--vendor/nmred/kafka-php/src/Kafka/Exception.php31
-rw-r--r--vendor/nmred/kafka-php/src/Kafka/Exception/NotSupported.php33
-rw-r--r--vendor/nmred/kafka-php/src/Kafka/Exception/OutOfRange.php33
-rw-r--r--vendor/nmred/kafka-php/src/Kafka/Exception/Protocol.php33
-rw-r--r--vendor/nmred/kafka-php/src/Kafka/Exception/Socket.php33
-rw-r--r--vendor/nmred/kafka-php/src/Kafka/Exception/SocketConnect.php33
-rw-r--r--vendor/nmred/kafka-php/src/Kafka/Exception/SocketEOF.php33
-rw-r--r--vendor/nmred/kafka-php/src/Kafka/Exception/SocketTimeout.php33
-rw-r--r--vendor/nmred/kafka-php/src/Kafka/Log.php78
-rw-r--r--vendor/nmred/kafka-php/src/Kafka/MetaDataFromKafka.php200
-rw-r--r--vendor/nmred/kafka-php/src/Kafka/Offset.php305
-rw-r--r--vendor/nmred/kafka-php/src/Kafka/Produce.php337
-rw-r--r--vendor/nmred/kafka-php/src/Kafka/Protocol/Decoder.php430
-rw-r--r--vendor/nmred/kafka-php/src/Kafka/Protocol/Encoder.php652
-rw-r--r--vendor/nmred/kafka-php/src/Kafka/Protocol/Fetch/Helper/CommitOffset.php119
-rw-r--r--vendor/nmred/kafka-php/src/Kafka/Protocol/Fetch/Helper/Consumer.php39
-rw-r--r--vendor/nmred/kafka-php/src/Kafka/Protocol/Fetch/Helper/FreeStream.php117
-rw-r--r--vendor/nmred/kafka-php/src/Kafka/Protocol/Fetch/Helper/Helper.php160
-rw-r--r--vendor/nmred/kafka-php/src/Kafka/Protocol/Fetch/Helper/HelperAbstract.php71
-rw-r--r--vendor/nmred/kafka-php/src/Kafka/Protocol/Fetch/Message.php175
-rw-r--r--vendor/nmred/kafka-php/src/Kafka/Protocol/Fetch/MessageSet.php269
-rw-r--r--vendor/nmred/kafka-php/src/Kafka/Protocol/Fetch/Partition.php375
-rw-r--r--vendor/nmred/kafka-php/src/Kafka/Protocol/Fetch/Topic.php345
-rw-r--r--vendor/nmred/kafka-php/src/Kafka/Protocol/Protocol.php230
-rw-r--r--vendor/nmred/kafka-php/src/Kafka/Socket.php365
-rw-r--r--vendor/nmred/kafka-php/src/Kafka/ZooKeeper.php364
29 files changed, 5614 insertions, 0 deletions
diff --git a/vendor/nmred/kafka-php/src/Kafka/Client.php b/vendor/nmred/kafka-php/src/Kafka/Client.php
new file mode 100644
index 00000000..a38e705b
--- /dev/null
+++ b/vendor/nmred/kafka-php/src/Kafka/Client.php
@@ -0,0 +1,290 @@
+<?php
+/* vim: set expandtab tabstop=4 shiftwidth=4 softtabstop=4 foldmethod=marker: */
+// +---------------------------------------------------------------------------
+// | SWAN [ $_SWANBR_SLOGAN_$ ]
+// +---------------------------------------------------------------------------
+// | Copyright $_SWANBR_COPYRIGHT_$
+// +---------------------------------------------------------------------------
+// | Version $_SWANBR_VERSION_$
+// +---------------------------------------------------------------------------
+// | Licensed ( $_SWANBR_LICENSED_URL_$ )
+// +---------------------------------------------------------------------------
+// | $_SWANBR_WEB_DOMAIN_$
+// +---------------------------------------------------------------------------
+
+namespace Kafka;
+
+/**
++------------------------------------------------------------------------------
+* Kafka protocol since Kafka v0.8
++------------------------------------------------------------------------------
+*
+* @package
+* @version $_SWANBR_VERSION_$
+* @copyright Copyleft
+* @author $_SWANBR_AUTHOR_$
++------------------------------------------------------------------------------
+*/
+
+class Client
+{
+ // {{{ consts
+ // }}}
+ // {{{ members
+
+ /**
+ * cluster metadata
+ *
+ * @var \Kafka\ClusterMetaData
+ * @access private
+ */
+ private $metadata = null;
+
+ /**
+ * broker host list
+ *
+ * @var array
+ * @access private
+ */
+ private $hostList = array();
+
+ /**
+ * save broker connection
+ *
+ * @var array
+ * @access private
+ */
+ private static $stream = array();
+
+ /**
+ * default stream options
+ *
+ * @var array
+ * @access private
+ */
+ private $streamOptions = array(
+ 'RecvTimeoutSec' => 0,
+ 'RecvTimeoutUsec' => 750000,
+ 'SendTimeoutSec' => 0,
+ 'SendTimeoutUsec' => 100000,
+ );
+
+ // }}}
+ // {{{ functions
+ // {{{ public function __construct()
+
+ /**
+ * __construct
+ *
+ * @access public
+ * @return void
+ */
+ public function __construct(ClusterMetaData $metadata)
+ {
+ $this->metadata = $metadata;
+ if (method_exists($metadata, 'setClient')) {
+ $this->metadata->setClient($this);
+ }
+ }
+
+ /**
+ * update stream options
+ *
+ * @param array $options
+ */
+ public function setStreamOptions($options = array())
+ {
+ // Merge the arrays
+ $this->streamOptions = array_merge($this->streamOptions, $options);
+ $this->updateStreamOptions();
+ }
+
+ /**
+ * @access public
+ * @param $name - name of stream option
+ * @param $value - value for option
+ */
+ public function setStreamOption($name, $value)
+ {
+ $this->streamOptions[$name] = $value;
+ $this->updateStreamOptions();
+ }
+
+ /**
+ * @access public
+ * @param $name - name of option
+ * @return mixed
+ */
+ public function getStreamOption($name)
+ {
+ if (array_key_exists($name, $this->streamOptions)) {
+ return $this->streamOptions[$name];
+ }
+ return null;
+ }
+
+ /**
+ * @access private
+ */
+ private function updateStreamOptions()
+ {
+ // Loop thru each stream
+ foreach (self::$stream as $host => $streams) {
+ foreach ($streams as $key => $info) {
+ // Update options
+ if (isset($info['stream'])) {
+ /** @var \Kafka\Socket $stream */
+ $stream = $info['stream'];
+ $stream->setRecvTimeoutSec($this->streamOptions['RecvTimeoutSec']);
+ $stream->setRecvTimeoutUsec($this->streamOptions['SendTimeoutUsec']);
+ $stream->setSendTimeoutSec($this->streamOptions['SendTimeoutSec']);
+ $stream->setSendTimeoutUsec($this->streamOptions['SendTimeoutUsec']);
+ }
+ }
+ }
+ }
+
+ // }}}
+ // {{{ public function getBrokers()
+
+ /**
+ * get broker server
+ *
+ * @access public
+ * @return void
+ */
+ public function getBrokers()
+ {
+ if (empty($this->hostList)) {
+ $brokerList = $this->metadata->listBrokers();
+ foreach ($brokerList as $brokerId => $info) {
+ if (!isset($info['host']) || !isset($info['port'])) {
+ continue;
+ }
+ $this->hostList[$brokerId] = $info['host'] . ':' . $info['port'];
+ }
+ }
+
+ return $this->hostList;
+ }
+
+ // }}}
+ // {{{ public function getHostByPartition()
+
+ /**
+ * get broker host by topic partition
+ *
+ * @param string $topicName
+ * @param int $partitionId
+ * @access public
+ * @return string
+ */
+ public function getHostByPartition($topicName, $partitionId = 0)
+ {
+ $partitionInfo = $this->metadata->getPartitionState($topicName, $partitionId);
+ if (!$partitionInfo) {
+ throw new \Kafka\Exception('topic:' . $topicName . ', partition id: ' . $partitionId . ' is not exists.');
+ }
+
+ $hostList = $this->getBrokers();
+ if (isset($partitionInfo['leader']) && isset($hostList[$partitionInfo['leader']])) {
+ return $hostList[$partitionInfo['leader']];
+ } else {
+ throw new \Kafka\Exception('can\'t find broker host.');
+ }
+ }
+
+ // }}}
+ // {{{ public function getZooKeeper()
+
+ /**
+ * get kafka zookeeper object
+ *
+ * @access public
+ * @return \Kafka\ZooKeeper
+ */
+ public function getZooKeeper()
+ {
+ if ($this->metadata instanceof \Kafka\ZooKeeper) {
+ return $this->metadata;
+ } else {
+ throw new \Kafka\Exception( 'ZooKeeper was not provided' );
+ }
+ }
+
+ // }}}
+ // {{{ public function getStream()
+
+ /**
+ * get broker broker connect
+ *
+ * @param string $host
+ * @access private
+ * @return void
+ */
+ public function getStream($host, $lockKey = null)
+ {
+ if (!$lockKey) {
+ $lockKey = uniqid($host);
+ }
+
+ list($hostname, $port) = explode(':', $host);
+ // find unlock stream
+ if (isset(self::$stream[$host])) {
+ foreach (self::$stream[$host] as $key => $info) {
+ if ($info['locked']) {
+ continue;
+ } else {
+ self::$stream[$host][$key]['locked'] = true;
+ $info['stream']->connect();
+ return array('key' => $key, 'stream' => $info['stream']);
+ }
+ }
+ }
+
+ // no idle stream
+ $stream = new \Kafka\Socket($hostname, $port, $this->getStreamOption('RecvTimeoutSec'), $this->getStreamOption('RecvTimeoutUsec'), $this->getStreamOption('SendTimeoutSec'), $this->getStreamOption('SendTimeoutUsec'));
+ $stream->connect();
+ self::$stream[$host][$lockKey] = array(
+ 'locked' => true,
+ 'stream' => $stream,
+ );
+ return array('key' => $lockKey, 'stream' => $stream);
+ }
+
+ // }}}
+ // {{{ public function freeStream()
+
+ /**
+ * free stream pool
+ *
+ * @param string $key
+ * @access public
+ * @return void
+ */
+ public function freeStream($key)
+ {
+ foreach (self::$stream as $host => $values) {
+ if (isset($values[$key])) {
+ self::$stream[$host][$key]['locked'] = false;
+ }
+ }
+ }
+
+ // }}}
+ // {{{ public function getTopicDetail()
+
+ /**
+ * get topic detail info
+ *
+ * @param string $topicName
+ * @return array
+ */
+ public function getTopicDetail($topicName)
+ {
+ return $this->metadata->getTopicDetail($topicName);
+ }
+
+ // }}}
+ // }}}
+}
diff --git a/vendor/nmred/kafka-php/src/Kafka/ClusterMetaData.php b/vendor/nmred/kafka-php/src/Kafka/ClusterMetaData.php
new file mode 100644
index 00000000..e9b3d064
--- /dev/null
+++ b/vendor/nmred/kafka-php/src/Kafka/ClusterMetaData.php
@@ -0,0 +1,53 @@
+<?php
+/* vim: set expandtab tabstop=4 shiftwidth=4 softtabstop=4 foldmethod=marker: */
+// +---------------------------------------------------------------------------
+// | SWAN [ $_SWANBR_SLOGAN_$ ]
+// +---------------------------------------------------------------------------
+// | Copyright $_SWANBR_COPYRIGHT_$
+// +---------------------------------------------------------------------------
+// | Version $_SWANBR_VERSION_$
+// +---------------------------------------------------------------------------
+// | Licensed ( $_SWANBR_LICENSED_URL_$ )
+// +---------------------------------------------------------------------------
+// | $_SWANBR_WEB_DOMAIN_$
+// +---------------------------------------------------------------------------
+
+namespace Kafka;
+
+/**
++------------------------------------------------------------------------------
+* Metadata about the kafka cluster
++------------------------------------------------------------------------------
+*
+* @package
+* @version $_SWANBR_VERSION_$
+* @copyright Copyleft
+* @author ebernhardson@wikimedia.org
++------------------------------------------------------------------------------
+*/
+
+interface ClusterMetaData
+{
+ /**
+ * get broker list from kafka metadata
+ *
+ * @access public
+ * @return array
+ */
+ public function listBrokers();
+
+ /**
+ * @param string $topicName
+ * @param integer $partitionId
+ * @access public
+ * @return array
+ */
+ public function getPartitionState($topicName, $partitionId = 0);
+
+ /**
+ * @param string $topicName
+ * @access public
+ * @return array
+ */
+ public function getTopicDetail($topicName);
+}
diff --git a/vendor/nmred/kafka-php/src/Kafka/Consumer.php b/vendor/nmred/kafka-php/src/Kafka/Consumer.php
new file mode 100644
index 00000000..5ff2d43f
--- /dev/null
+++ b/vendor/nmred/kafka-php/src/Kafka/Consumer.php
@@ -0,0 +1,378 @@
+<?php
+/* vim: set expandtab tabstop=4 shiftwidth=4 softtabstop=4 foldmethod=marker: */
+// +---------------------------------------------------------------------------
+// | SWAN [ $_SWANBR_SLOGAN_$ ]
+// +---------------------------------------------------------------------------
+// | Copyright $_SWANBR_COPYRIGHT_$
+// +---------------------------------------------------------------------------
+// | Version $_SWANBR_VERSION_$
+// +---------------------------------------------------------------------------
+// | Licensed ( $_SWANBR_LICENSED_URL_$ )
+// +---------------------------------------------------------------------------
+// | $_SWANBR_WEB_DOMAIN_$
+// +---------------------------------------------------------------------------
+
+namespace Kafka;
+
+/**
++------------------------------------------------------------------------------
+* Kafka protocol since Kafka v0.8
++------------------------------------------------------------------------------
+*
+* @package
+* @version $_SWANBR_VERSION_$
+* @copyright Copyleft
+* @author $_SWANBR_AUTHOR_$
++------------------------------------------------------------------------------
+*/
+
+class Consumer
+{
+ // {{{ consts
+ // }}}
+ // {{{ members
+
+ /**
+ * client
+ *
+ * @var mixed
+ * @access private
+ */
+ private $client = null;
+
+ /**
+ * send message options cache
+ *
+ * @var array
+ * @access private
+ */
+ private $payload = array();
+
+ /**
+ * consumer group
+ *
+ * @var string
+ * @access private
+ */
+ private $group = '';
+
+ /**
+ * from offset
+ *
+ * @var mixed
+ * @access private
+ */
+ private $fromOffset = true;
+
+ /**
+ * produce instance
+ *
+ * @var \Kafka\Produce
+ * @access private
+ */
+ private static $instance = null;
+
+ /**
+ * broker host list
+ *
+ * @var array
+ * @access private
+ */
+ private $hostList = array();
+
+ /**
+ * save broker connection
+ *
+ * @var array
+ * @access private
+ */
+ private $stream = array();
+
+ /**
+ * maxSize
+ *
+ * @var integer
+ */
+ private $maxSize = 1048576;
+
+ /**
+ * offsetStrategy
+ * @var integer
+ */
+ private $offsetStrategy = \Kafka\Offset::DEFAULT_EARLY;
+
+ // }}}
+ // {{{ functions
+ // {{{ public function static getInstance()
+
+ /**
+ * set send messages
+ *
+ * @access public
+ * @return void
+ */
+ public static function getInstance($hostList, $timeout = null)
+ {
+ if (is_null(self::$instance)) {
+ self::$instance = new self($hostList, $timeout);
+ }
+
+ return self::$instance;
+ }
+
+ // }}}
+ // {{{ private function __construct()
+
+ /**
+ * __construct
+ *
+ * @access public
+ * @return void
+ */
+ private function __construct($hostList, $timeout = null)
+ {
+ $zookeeper = new \Kafka\ZooKeeper($hostList, $timeout);
+ $this->client = new \Kafka\Client($zookeeper);
+ }
+
+ // }}}
+ // {{{ public function clearPayload()
+
+ /**
+ * clearPayload
+ *
+ * @access public
+ * @return void
+ */
+ public function clearPayload()
+ {
+ $this->payload = array();
+ }
+
+ // }}}
+ // {{{ public function setTopic()
+
+ /**
+ * set topic name
+ *
+ * @access public
+ * @return void
+ */
+ public function setTopic($topicName, $defaultOffset = null)
+ {
+ $parts = $this->client->getTopicDetail($topicName);
+ if (!isset($parts['partitions']) || empty($parts['partitions'])) {
+ // set topic fail.
+ return $this;
+ }
+
+ foreach ($parts['partitions'] as $partId => $info) {
+ $this->setPartition($topicName, $partId, $defaultOffset);
+ }
+
+ return $this;
+ }
+
+ // }}}
+ // {{{ public function setPartition()
+
+ /**
+ * set topic partition
+ *
+ * @access public
+ * @return void
+ */
+ public function setPartition($topicName, $partitionId = 0, $offset = null)
+ {
+ if (is_null($offset)) {
+ if ($this->fromOffset) {
+ $offsetObject = new \Kafka\Offset($this->client, $this->group, $topicName, $partitionId);
+ $offset = $offsetObject->getOffset($this->offsetStrategy);
+ \Kafka\Log::log('topic name:' . $topicName . ', part:' . $partitionId . 'get offset from kafka server, offet:' . $offset, LOG_DEBUG);
+ } else {
+ $offset = 0;
+ }
+ }
+ $this->payload[$topicName][$partitionId] = $offset;
+
+ return $this;
+ }
+
+ // }}}
+ // {{{ public function setFromOffset()
+
+ /**
+ * set whether starting offset fetch
+ *
+ * @param boolean $fromOffset
+ * @access public
+ * @return void
+ */
+ public function setFromOffset($fromOffset)
+ {
+ $this->fromOffset = (boolean) $fromOffset;
+ }
+
+ // }}}
+ // {{{ public function setMaxBytes()
+
+ /**
+ * set fetch message max bytes
+ *
+ * @param int $maxSize
+ * @access public
+ * @return void
+ */
+ public function setMaxBytes($maxSize)
+ {
+ $this->maxSize = $maxSize;
+ }
+
+ // }}}
+ // {{{ public function setGroup()
+
+ /**
+ * set consumer group
+ *
+ * @param string $group
+ * @access public
+ * @return void
+ */
+ public function setGroup($group)
+ {
+ $this->group = (string) $group;
+ return $this;
+ }
+
+ // }}}
+ // {{{ public function fetch()
+
+ /**
+ * fetch message to broker
+ *
+ * @access public
+ * @return void
+ */
+ public function fetch()
+ {
+ $data = $this->_formatPayload();
+ if (empty($data)) {
+ return false;
+ }
+
+ $responseData = array();
+ $streams = array();
+ foreach ($data as $host => $requestData) {
+ $connArr = $this->client->getStream($host);
+ $conn = $connArr['stream'];
+ $encoder = new \Kafka\Protocol\Encoder($conn);
+ $encoder->fetchRequest($requestData);
+ $streams[$connArr['key']] = $conn;
+ }
+
+ $fetch = new \Kafka\Protocol\Fetch\Topic($streams, $data);
+
+ // register fetch helper
+ $freeStream = new \Kafka\Protocol\Fetch\Helper\FreeStream($this->client);
+ $freeStream->setStreams($streams);
+ \Kafka\Protocol\Fetch\Helper\Helper::registerHelper('freeStream', $freeStream);
+
+ // register partition commit offset
+ $commitOffset = new \Kafka\Protocol\Fetch\Helper\CommitOffset($this->client);
+ $commitOffset->setGroup($this->group);
+ \Kafka\Protocol\Fetch\Helper\Helper::registerHelper('commitOffset', $commitOffset);
+
+ $updateConsumer = new \Kafka\Protocol\Fetch\Helper\Consumer($this);
+ \Kafka\Protocol\Fetch\Helper\Helper::registerHelper('updateConsumer', $updateConsumer);
+
+ return $fetch;
+ }
+
+ // }}}
+ // {{{ public function getClient()
+
+ /**
+ * get client object
+ *
+ * @access public
+ * @return void
+ */
+ public function getClient()
+ {
+ return $this->client;
+ }
+
+ /**
+ * passthru method to client for setting stream options
+ *
+ * @param array $options
+ */
+ public function setStreamOptions($options = array())
+ {
+ $this->client->setStreamOptions($options);
+ }
+
+ // }}}
+ // {{{ private function _formatPayload()
+
+ /**
+ * format payload array
+ *
+ * @access private
+ * @return array
+ */
+ private function _formatPayload()
+ {
+ if (empty($this->payload)) {
+ return array();
+ }
+
+ $data = array();
+ foreach ($this->payload as $topicName => $partitions) {
+ foreach ($partitions as $partitionId => $offset) {
+ $host = $this->client->getHostByPartition($topicName, $partitionId);
+ $data[$host][$topicName][$partitionId] = $offset;
+ }
+ }
+
+ $requestData = array();
+ foreach ($data as $host => $info) {
+ $topicData = array();
+ foreach ($info as $topicName => $partitions) {
+ $partitionData = array();
+ foreach ($partitions as $partitionId => $offset) {
+ $partitionData[] = array(
+ 'partition_id' => $partitionId,
+ 'offset' => $offset,
+ 'max_bytes' => $this->maxSize,
+ );
+ }
+ $topicData[] = array(
+ 'topic_name' => $topicName,
+ 'partitions' => $partitionData,
+ );
+ }
+
+ $requestData[$host] = array(
+ 'data' => $topicData,
+ );
+ }
+
+ return $requestData;
+ }
+
+ /**
+ * const LAST_OFFSET = -1;
+ * const EARLIEST_OFFSET = -2;
+ * const DEFAULT_LAST = -2;
+ * const DEFAULT_EARLY = -1;
+ * @param type $offsetStrategy
+ */
+ public function setOffsetStrategy($offsetStrategy)
+ {
+ $this->offsetStrategy = $offsetStrategy;
+ }
+
+ // }}}
+ // }}}
+}
diff --git a/vendor/nmred/kafka-php/src/Kafka/Exception.php b/vendor/nmred/kafka-php/src/Kafka/Exception.php
new file mode 100644
index 00000000..f336f1c3
--- /dev/null
+++ b/vendor/nmred/kafka-php/src/Kafka/Exception.php
@@ -0,0 +1,31 @@
+<?php
+/* vim: set expandtab tabstop=4 shiftwidth=4 softtabstop=4 foldmethod=marker: */
+// +---------------------------------------------------------------------------
+// | SWAN [ $_SWANBR_SLOGAN_$ ]
+// +---------------------------------------------------------------------------
+// | Copyright $_SWANBR_COPYRIGHT_$
+// +---------------------------------------------------------------------------
+// | Version $_SWANBR_VERSION_$
+// +---------------------------------------------------------------------------
+// | Licensed ( $_SWANBR_LICENSED_URL_$ )
+// +---------------------------------------------------------------------------
+// | $_SWANBR_WEB_DOMAIN_$
+// +---------------------------------------------------------------------------
+
+namespace Kafka;
+
+/**
++------------------------------------------------------------------------------
+* Kafka protocol since Kafka v0.8
++------------------------------------------------------------------------------
+*
+* @package
+* @version $_SWANBR_VERSION_$
+* @copyright Copyleft
+* @author $_SWANBR_AUTHOR_$
++------------------------------------------------------------------------------
+*/
+
+class Exception extends \RuntimeException
+{
+}
diff --git a/vendor/nmred/kafka-php/src/Kafka/Exception/NotSupported.php b/vendor/nmred/kafka-php/src/Kafka/Exception/NotSupported.php
new file mode 100644
index 00000000..011129a2
--- /dev/null
+++ b/vendor/nmred/kafka-php/src/Kafka/Exception/NotSupported.php
@@ -0,0 +1,33 @@
+<?php
+/* vim: set expandtab tabstop=4 shiftwidth=4 softtabstop=4 foldmethod=marker: */
+// +---------------------------------------------------------------------------
+// | SWAN [ $_SWANBR_SLOGAN_$ ]
+// +---------------------------------------------------------------------------
+// | Copyright $_SWANBR_COPYRIGHT_$
+// +---------------------------------------------------------------------------
+// | Version $_SWANBR_VERSION_$
+// +---------------------------------------------------------------------------
+// | Licensed ( $_SWANBR_LICENSED_URL_$ )
+// +---------------------------------------------------------------------------
+// | $_SWANBR_WEB_DOMAIN_$
+// +---------------------------------------------------------------------------
+
+namespace Kafka\Exception;
+
+use \Kafka\Exception;
+
+/**
++------------------------------------------------------------------------------
+* Kafka php client exception
++------------------------------------------------------------------------------
+*
+* @package
+* @version $_SWANBR_VERSION_$
+* @copyright Copyleft
+* @author $_SWANBR_AUTHOR_$
++------------------------------------------------------------------------------
+*/
+
+class NotSupported extends \Exception
+{
+}
diff --git a/vendor/nmred/kafka-php/src/Kafka/Exception/OutOfRange.php b/vendor/nmred/kafka-php/src/Kafka/Exception/OutOfRange.php
new file mode 100644
index 00000000..374d1538
--- /dev/null
+++ b/vendor/nmred/kafka-php/src/Kafka/Exception/OutOfRange.php
@@ -0,0 +1,33 @@
+<?php
+/* vim: set expandtab tabstop=4 shiftwidth=4 softtabstop=4 foldmethod=marker: */
+// +---------------------------------------------------------------------------
+// | SWAN [ $_SWANBR_SLOGAN_$ ]
+// +---------------------------------------------------------------------------
+// | Copyright $_SWANBR_COPYRIGHT_$
+// +---------------------------------------------------------------------------
+// | Version $_SWANBR_VERSION_$
+// +---------------------------------------------------------------------------
+// | Licensed ( $_SWANBR_LICENSED_URL_$ )
+// +---------------------------------------------------------------------------
+// | $_SWANBR_WEB_DOMAIN_$
+// +---------------------------------------------------------------------------
+
+namespace Kafka\Exception;
+
+use \Kafka\Exception;
+
+/**
++------------------------------------------------------------------------------
+* Kafka php client exception
++------------------------------------------------------------------------------
+*
+* @package
+* @version $_SWANBR_VERSION_$
+* @copyright Copyleft
+* @author $_SWANBR_AUTHOR_$
++------------------------------------------------------------------------------
+*/
+
+class OutOfRange extends Exception
+{
+}
diff --git a/vendor/nmred/kafka-php/src/Kafka/Exception/Protocol.php b/vendor/nmred/kafka-php/src/Kafka/Exception/Protocol.php
new file mode 100644
index 00000000..6e213f05
--- /dev/null
+++ b/vendor/nmred/kafka-php/src/Kafka/Exception/Protocol.php
@@ -0,0 +1,33 @@
+<?php
+/* vim: set expandtab tabstop=4 shiftwidth=4 softtabstop=4 foldmethod=marker: */
+// +---------------------------------------------------------------------------
+// | SWAN [ $_SWANBR_SLOGAN_$ ]
+// +---------------------------------------------------------------------------
+// | Copyright $_SWANBR_COPYRIGHT_$
+// +---------------------------------------------------------------------------
+// | Version $_SWANBR_VERSION_$
+// +---------------------------------------------------------------------------
+// | Licensed ( $_SWANBR_LICENSED_URL_$ )
+// +---------------------------------------------------------------------------
+// | $_SWANBR_WEB_DOMAIN_$
+// +---------------------------------------------------------------------------
+
+namespace Kafka\Exception;
+
+use \Kafka\Exception;
+
+/**
++------------------------------------------------------------------------------
+* Kafka php client exception
++------------------------------------------------------------------------------
+*
+* @package
+* @version $_SWANBR_VERSION_$
+* @copyright Copyleft
+* @author $_SWANBR_AUTHOR_$
++------------------------------------------------------------------------------
+*/
+
+class Protocol extends \Exception
+{
+}
diff --git a/vendor/nmred/kafka-php/src/Kafka/Exception/Socket.php b/vendor/nmred/kafka-php/src/Kafka/Exception/Socket.php
new file mode 100644
index 00000000..aca93e25
--- /dev/null
+++ b/vendor/nmred/kafka-php/src/Kafka/Exception/Socket.php
@@ -0,0 +1,33 @@
+<?php
+/* vim: set expandtab tabstop=4 shiftwidth=4 softtabstop=4 foldmethod=marker: */
+// +---------------------------------------------------------------------------
+// | SWAN [ $_SWANBR_SLOGAN_$ ]
+// +---------------------------------------------------------------------------
+// | Copyright $_SWANBR_COPYRIGHT_$
+// +---------------------------------------------------------------------------
+// | Version $_SWANBR_VERSION_$
+// +---------------------------------------------------------------------------
+// | Licensed ( $_SWANBR_LICENSED_URL_$ )
+// +---------------------------------------------------------------------------
+// | $_SWANBR_WEB_DOMAIN_$
+// +---------------------------------------------------------------------------
+
+namespace Kafka\Exception;
+
+use \Kafka\Exception;
+
+/**
++------------------------------------------------------------------------------
+* Kafka php client exception
++------------------------------------------------------------------------------
+*
+* @package
+* @version $_SWANBR_VERSION_$
+* @copyright Copyleft
+* @author $_SWANBR_AUTHOR_$
++------------------------------------------------------------------------------
+*/
+
+class Socket extends \Exception
+{
+}
diff --git a/vendor/nmred/kafka-php/src/Kafka/Exception/SocketConnect.php b/vendor/nmred/kafka-php/src/Kafka/Exception/SocketConnect.php
new file mode 100644
index 00000000..476b2a48
--- /dev/null
+++ b/vendor/nmred/kafka-php/src/Kafka/Exception/SocketConnect.php
@@ -0,0 +1,33 @@
+<?php
+/* vim: set expandtab tabstop=4 shiftwidth=4 softtabstop=4 foldmethod=marker: */
+// +---------------------------------------------------------------------------
+// | SWAN [ $_SWANBR_SLOGAN_$ ]
+// +---------------------------------------------------------------------------
+// | Copyright $_SWANBR_COPYRIGHT_$
+// +---------------------------------------------------------------------------
+// | Version $_SWANBR_VERSION_$
+// +---------------------------------------------------------------------------
+// | Licensed ( $_SWANBR_LICENSED_URL_$ )
+// +---------------------------------------------------------------------------
+// | $_SWANBR_WEB_DOMAIN_$
+// +---------------------------------------------------------------------------
+
+namespace Kafka\Exception;
+
+use \Kafka\Exception;
+
+/**
++------------------------------------------------------------------------------
+ * Kafka php client exception
++------------------------------------------------------------------------------
+ *
+ * @package
+ * @version $_SWANBR_VERSION_$
+ * @copyright Copyleft
+ * @author $_SWANBR_AUTHOR_$
++------------------------------------------------------------------------------
+ */
+
+class SocketConnect extends Socket
+{
+}
diff --git a/vendor/nmred/kafka-php/src/Kafka/Exception/SocketEOF.php b/vendor/nmred/kafka-php/src/Kafka/Exception/SocketEOF.php
new file mode 100644
index 00000000..35e34e07
--- /dev/null
+++ b/vendor/nmred/kafka-php/src/Kafka/Exception/SocketEOF.php
@@ -0,0 +1,33 @@
+<?php
+/* vim: set expandtab tabstop=4 shiftwidth=4 softtabstop=4 foldmethod=marker: */
+// +---------------------------------------------------------------------------
+// | SWAN [ $_SWANBR_SLOGAN_$ ]
+// +---------------------------------------------------------------------------
+// | Copyright $_SWANBR_COPYRIGHT_$
+// +---------------------------------------------------------------------------
+// | Version $_SWANBR_VERSION_$
+// +---------------------------------------------------------------------------
+// | Licensed ( $_SWANBR_LICENSED_URL_$ )
+// +---------------------------------------------------------------------------
+// | $_SWANBR_WEB_DOMAIN_$
+// +---------------------------------------------------------------------------
+
+namespace Kafka\Exception;
+
+use \Kafka\Exception;
+
+/**
++------------------------------------------------------------------------------
+* Kafka php client exception
++------------------------------------------------------------------------------
+*
+* @package
+* @version $_SWANBR_VERSION_$
+* @copyright Copyleft
+* @author $_SWANBR_AUTHOR_$
++------------------------------------------------------------------------------
+*/
+
+class SocketEOF extends Exception
+{
+}
diff --git a/vendor/nmred/kafka-php/src/Kafka/Exception/SocketTimeout.php b/vendor/nmred/kafka-php/src/Kafka/Exception/SocketTimeout.php
new file mode 100644
index 00000000..b0d38c2c
--- /dev/null
+++ b/vendor/nmred/kafka-php/src/Kafka/Exception/SocketTimeout.php
@@ -0,0 +1,33 @@
+<?php
+/* vim: set expandtab tabstop=4 shiftwidth=4 softtabstop=4 foldmethod=marker: */
+// +---------------------------------------------------------------------------
+// | SWAN [ $_SWANBR_SLOGAN_$ ]
+// +---------------------------------------------------------------------------
+// | Copyright $_SWANBR_COPYRIGHT_$
+// +---------------------------------------------------------------------------
+// | Version $_SWANBR_VERSION_$
+// +---------------------------------------------------------------------------
+// | Licensed ( $_SWANBR_LICENSED_URL_$ )
+// +---------------------------------------------------------------------------
+// | $_SWANBR_WEB_DOMAIN_$
+// +---------------------------------------------------------------------------
+
+namespace Kafka\Exception;
+
+use \Kafka\Exception;
+
+/**
++------------------------------------------------------------------------------
+* Kafka php client exception
++------------------------------------------------------------------------------
+*
+* @package
+* @version $_SWANBR_VERSION_$
+* @copyright Copyleft
+* @author $_SWANBR_AUTHOR_$
++------------------------------------------------------------------------------
+*/
+
+class SocketTimeout extends Exception
+{
+}
diff --git a/vendor/nmred/kafka-php/src/Kafka/Log.php b/vendor/nmred/kafka-php/src/Kafka/Log.php
new file mode 100644
index 00000000..481bfc3d
--- /dev/null
+++ b/vendor/nmred/kafka-php/src/Kafka/Log.php
@@ -0,0 +1,78 @@
+<?php
+/* vim: set expandtab tabstop=4 shiftwidth=4 softtabstop=4 foldmethod=marker: */
+// +---------------------------------------------------------------------------
+// | SWAN [ $_SWANBR_SLOGAN_$ ]
+// +---------------------------------------------------------------------------
+// | Copyright $_SWANBR_COPYRIGHT_$
+// +---------------------------------------------------------------------------
+// | Version $_SWANBR_VERSION_$
+// +---------------------------------------------------------------------------
+// | Licensed ( $_SWANBR_LICENSED_URL_$ )
+// +---------------------------------------------------------------------------
+// | $_SWANBR_WEB_DOMAIN_$
+// +---------------------------------------------------------------------------
+
+namespace Kafka;
+
+/**
++------------------------------------------------------------------------------
+* Kafka protocol since Kafka v0.8
++------------------------------------------------------------------------------
+*
+* @package
+* @version $_SWANBR_VERSION_$
+* @copyright Copyleft
+* @author $_SWANBR_AUTHOR_$
++------------------------------------------------------------------------------
+*/
+
+class Log
+{
+ // {{{ consts
+ // }}}
+ // {{{ members
+
+ /**
+ * log
+ *
+ * @var mixed
+ * @access private
+ */
+ private static $log = null;
+
+ // }}}
+ // {{{ functions
+ // {{{ public static function setLog()
+
+ /**
+ * setLog
+ *
+ * @access public
+ * @return void
+ */
+ public static function setLog($log)
+ {
+ if ($log) {
+ self::$log = $log;
+ }
+ }
+
+ // }}}
+ // {{{ public static function log()
+
+ /**
+ * log
+ *
+ * @access public
+ * @return void
+ */
+ public static function log($message, $level = LOG_DEBUG)
+ {
+ if (self::$log && method_exists(self::$log, 'log')) {
+ self::$log->log($message, $level);
+ }
+ }
+
+ // }}}
+ // }}}
+}
diff --git a/vendor/nmred/kafka-php/src/Kafka/MetaDataFromKafka.php b/vendor/nmred/kafka-php/src/Kafka/MetaDataFromKafka.php
new file mode 100644
index 00000000..9d2c613e
--- /dev/null
+++ b/vendor/nmred/kafka-php/src/Kafka/MetaDataFromKafka.php
@@ -0,0 +1,200 @@
+<?php
+
+/* vim: set expandtab tabstop=4 shiftwidth=4 softtabstop=4 foldmethod=marker: */
+// +---------------------------------------------------------------------------
+// | SWAN [ $_SWANBR_SLOGAN_$ ]
+// +---------------------------------------------------------------------------
+// | Copyright $_SWANBR_COPYRIGHT_$
+// +---------------------------------------------------------------------------
+// | Version $_SWANBR_VERSION_$
+// +---------------------------------------------------------------------------
+// | Licensed ( $_SWANBR_LICENSED_URL_$ )
+// +---------------------------------------------------------------------------
+// | $_SWANBR_WEB_DOMAIN_$
+// +---------------------------------------------------------------------------
+
+namespace Kafka;
+
+/**
++------------------------------------------------------------------------------
+* Cluster metadata provided by kafka
++------------------------------------------------------------------------------
+*
+* @package
+* @version $_SWANBR_VERSION_$
+* @copyright Copyleft
+* @author ebernhardson@wikimedia.org
++------------------------------------------------------------------------------
+*/
+
+class MetaDataFromKafka implements ClusterMetaData
+{
+ // {{{ consts
+ // }}}
+ // {{{ members
+
+ /**
+ * client
+ *
+ * @var \Kafka\Client
+ * @access private
+ */
+ private $client;
+
+ /**
+ * list of kafka brokers to get metadata from
+ *
+ * @var array
+ * @access private
+ */
+ private $hostList;
+
+ /**
+ * List of all kafka brokers
+ *
+ * @var array
+ * @access private
+ */
+ private $brokers = array();
+
+ /**
+ * List of all loaded topic metadata
+ *
+ * @var array
+ * @access private
+ */
+ private $topics = array();
+
+ // }}}
+ // {{{ functions
+ // {{{ public function __construct()
+
+ /**
+ * @var string|array $hostList List of kafka brokers to get metadata from
+ * @access public
+ */
+ public function __construct($hostList)
+ {
+ if (is_string($hostList)) { // support host list 127.0.0.1:9092,192.168.2.11:9092 form
+ $this->hostList = explode(',', $hostList);
+ } else {
+ $this->hostList = (array)$hostList;
+ }
+ // randomize the order of servers we collect metadata from
+ shuffle($this->hostList);
+ }
+
+ // }}}
+ // {{{ public function setClient()
+
+ /**
+ * @var \Kafka\Client $client
+ * @access public
+ * @return void
+ */
+ public function setClient(\Kafka\Client $client)
+ {
+ $this->client = $client;
+ }
+
+ // }}}
+ // {{{ public function listBrokers()
+
+ /**
+ * get broker list from kafka metadata
+ *
+ * @access public
+ * @return array
+ */
+ public function listBrokers()
+ {
+ if ($this->brokers === null) {
+ $this->loadBrokers();
+ }
+ return $this->brokers;
+ }
+
+ // }}}
+ // {{{ public function getPartitionState()
+
+ public function getPartitionState($topicName, $partitionId = 0)
+ {
+ if (!isset( $this->topics[$topicName] ) ) {
+ $this->loadTopicDetail(array($topicName));
+ }
+ if ( isset( $this->topics[$topicName]['partitions'][$partitionId] ) ) {
+ return $this->topics[$topicName]['partitions'][$partitionId];
+ } else {
+ return null;
+ }
+ }
+
+ // }}}
+ // {{{ public function getTopicDetail()
+
+ /**
+ *
+ * @param string $topicName
+ * @access public
+ * @return array
+ */
+ public function getTopicDetail($topicName)
+ {
+ if (!isset( $this->topics[$topicName] ) ) {
+ $this->loadTopicDetail(array($topicName));
+ }
+ if (isset( $this->topics[$topicName] ) ) {
+ return $this->topics[$topicName];
+ } else {
+ return array();
+ }
+ }
+
+ // }}}
+ // {{{ private function loadBrokers()
+
+ private function loadBrokers()
+ {
+ $this->brokers = array();
+ // not sure how to ask for only the brokers without a topic...
+ // just ask for a topic we don't care about
+ $this->loadTopicDetail(array('test'));
+ }
+
+ // }}}
+ // {{{ private function loadTopicDetail()
+
+ private function loadTopicDetail(array $topics)
+ {
+ if ($this->client === null) {
+ throw new \Kafka\Exception('client was not provided');
+ }
+ $response = null;
+ foreach ($this->hostList as $host) {
+ try {
+ $response = null;
+ $stream = $this->client->getStream($host);
+ $conn = $stream['stream'];
+ $encoder = new \Kafka\Protocol\Encoder($conn);
+ $encoder->metadataRequest($topics);
+ $decoder = new \Kafka\Protocol\Decoder($conn);
+ $response = $decoder->metadataResponse();
+ $this->client->freeStream($stream['key']);
+ break;
+ } catch (\Kafka\Exception $e) {
+ // keep trying
+ }
+ }
+ if ($response) {
+ // Merge arrays using "+" operator to preserve key (which are broker IDs)
+ // instead of array_merge (which reindex numeric keys)
+ $this->brokers = $response['brokers'] + $this->brokers;
+ $this->topics = array_merge($response['topics'], $this->topics);
+ } else {
+ throw new \Kafka\Exception('Could not connect to any kafka brokers');
+ }
+ }
+
+ // }}}
+ // }}}
+}
diff --git a/vendor/nmred/kafka-php/src/Kafka/Offset.php b/vendor/nmred/kafka-php/src/Kafka/Offset.php
new file mode 100644
index 00000000..7ad3f9d8
--- /dev/null
+++ b/vendor/nmred/kafka-php/src/Kafka/Offset.php
@@ -0,0 +1,305 @@
+<?php
+/* vim: set expandtab tabstop=4 shiftwidth=4 softtabstop=4 foldmethod=marker: */
+// +---------------------------------------------------------------------------
+// | SWAN [ $_SWANBR_SLOGAN_$ ]
+// +---------------------------------------------------------------------------
+// | Copyright $_SWANBR_COPYRIGHT_$
+// +---------------------------------------------------------------------------
+// | Version $_SWANBR_VERSION_$
+// +---------------------------------------------------------------------------
+// | Licensed ( $_SWANBR_LICENSED_URL_$ )
+// +---------------------------------------------------------------------------
+// | $_SWANBR_WEB_DOMAIN_$
+// +---------------------------------------------------------------------------
+
+namespace Kafka;
+
+use \Kafka\Log;
+
+/**
++------------------------------------------------------------------------------
+* Kafka protocol since Kafka v0.8
++------------------------------------------------------------------------------
+*
+* @package
+* @version $_SWANBR_VERSION_$
+* @copyright Copyleft
+* @author $_SWANBR_AUTHOR_$
++------------------------------------------------------------------------------
+*/
+
+class Offset
+{
+ // {{{ consts
+
+ /**
+ * receive the latest offset
+ */
+ const LAST_OFFSET = -1;
+
+ /**
+ * receive the earliest available offset.
+ */
+ const EARLIEST_OFFSET = -2;
+
+ /**
+ * function getOffset if read invalid value use latest offset instead of
+ */
+ const DEFAULT_LAST = -2;
+
+ /**
+ * function getOffset if read invalid value use earliest offset instead of
+ */
+ const DEFAULT_EARLY = -1;
+
+ // }}}
+ // {{{ members
+
+ /**
+ * client
+ *
+ * @var mixed
+ * @access private
+ */
+ private $client = null;
+
+ /**
+ * consumer group
+ *
+ * @var string
+ * @access private
+ */
+ private $groupId = '';
+
+ /**
+ * topic name
+ *
+ * @var string
+ * @access private
+ */
+ private $topicName = '';
+
+ /**
+ * topic partition id, default 0
+ *
+ * @var float
+ * @access private
+ */
+ private $partitionId = 0;
+
+ /**
+ * encoder
+ *
+ * @var mixed
+ * @access private
+ */
+ private $encoder = null;
+
+ /**
+ * decoder
+ *
+ * @var mixed
+ * @access private
+ */
+ private $decoder = null;
+
+ /**
+ * streamKey
+ *
+ * @var string
+ * @access private
+ */
+ private $streamKey = '';
+
+ // }}}
+ // {{{ functions
+ // {{{ public function __construct()
+
+ /**
+ * __construct
+ *
+ * @access public
+ * @return void
+ */
+ public function __construct($client, $groupId, $topicName, $partitionId = 0)
+ {
+ $this->client = $client;
+ $this->groupId = $groupId;
+ $this->topicName = $topicName;
+ $this->partitionId = $partitionId;
+
+ $host = $this->client->getHostByPartition($topicName, $partitionId);
+ $stream = $this->client->getStream($host);
+ $conn = $stream['stream'];
+ $this->streamKey = $stream['key'];
+ $this->encoder = new \Kafka\Protocol\Encoder($conn);
+ $this->decoder = new \Kafka\Protocol\Decoder($conn);
+ }
+
+ // }}}
+ // {{{ public function setOffset()
+
+ /**
+ * set consumer offset
+ *
+ * @param integer $offset
+ * @access public
+ * @return void
+ */
+ public function setOffset($offset)
+ {
+ $maxOffset = $this->getProduceOffset();
+ if ($offset > $maxOffset) {
+ throw new \Kafka\Exception('this offset is invalid. must less than max offset:' . $maxOffset);
+ }
+
+ $data = array(
+ 'group_id' => $this->groupId,
+ 'data' => array(
+ array(
+ 'topic_name' => $this->topicName,
+ 'partitions' => array(
+ array(
+ 'partition_id' => $this->partitionId,
+ 'offset' => $offset,
+ ),
+ ),
+ ),
+ ),
+ );
+
+ $topicName = $this->topicName;
+ $partitionId = $this->partitionId;
+
+ $this->encoder->commitOffsetRequest($data);
+ $result = $this->decoder->commitOffsetResponse();
+ $this->client->freeStream($this->streamKey);
+ if (!isset($result[$topicName][$partitionId]['errCode'])) {
+ throw new \Kafka\Exception('commit topic offset failed.');
+ }
+ if ($result[$topicName][$partitionId]['errCode'] != 0) {
+ throw new \Kafka\Exception(\Kafka\Protocol\Decoder::getError($result[$topicName][$partitionId]['errCode']));
+ }
+ }
+
+ // }}}
+ // {{{ public function getOffset()
+
+ /**
+ * get consumer offset
+ *
+ * @param integer $defaultOffset
+ * if defaultOffset -1 instead of early offset
+ * if defaultOffset -2 instead of last offset
+ * @access public
+ * @return void
+ */
+ public function getOffset($defaultOffset = self::DEFAULT_LAST)
+ {
+ $maxOffset = $this->getProduceOffset(self::LAST_OFFSET);
+ $minOffset = $this->getProduceOffset(self::EARLIEST_OFFSET);
+ $data = array(
+ 'group_id' => $this->groupId,
+ 'data' => array(
+ array(
+ 'topic_name' => $this->topicName,
+ 'partitions' => array(
+ array(
+ 'partition_id' => $this->partitionId,
+ ),
+ ),
+ ),
+ ),
+ );
+
+ $this->encoder->fetchOffsetRequest($data);
+ $result = $this->decoder->fetchOffsetResponse();
+ $this->client->freeStream($this->streamKey);
+
+ $topicName = $this->topicName;
+ $partitionId = $this->partitionId;
+ if (!isset($result[$topicName][$partitionId]['errCode'])) {
+ throw new \Kafka\Exception('fetch topic offset failed.');
+ }
+ if ($result[$topicName][$partitionId]['errCode'] == 3) {
+ switch ($defaultOffset) {
+ case self::DEFAULT_LAST:
+ return $maxOffset;
+ Log::log("topic name: $topicName, partitionId: $partitionId, get offset value is default last.", LOG_INFO);
+ case self::DEFAULT_EARLY:
+ Log::log("topic name: $topicName, partitionId: $partitionId, get offset value is default early.", LOG_INFO);
+ return $minOffset;
+ default:
+ $this->setOffset($defaultOffset);
+ Log::log("topic name: $topicName, partitionId: $partitionId, get offset value is default $defaultOffset.", LOG_INFO);
+ return $defaultOffset;
+ }
+ if ($defaultOffset) {
+ $this->setOffset($defaultOffset);
+ return $defaultOffset;
+ }
+ } elseif ($result[$topicName][$partitionId]['errCode'] == 0) {
+ $offset = $result[$topicName][$partitionId]['offset'];
+ if ($offset > $maxOffset || $offset < $minOffset) {
+ if ($defaultOffset == self::DEFAULT_EARLY) {
+ $offset = $minOffset;
+ } else {
+ $offset = $maxOffset;
+ }
+ }
+ Log::log("topic name: $topicName, partitionId: $partitionId, get offset value is $offset.", LOG_INFO);
+
+ return $offset;
+ } else {
+ throw new \Kafka\Exception(\Kafka\Protocol\Decoder::getError($result[$topicName][$partitionId]['errCode']));
+ }
+ }
+
+ // }}}
+ // {{{ public function getProduceOffset()
+
+ /**
+ * get produce server offset
+ *
+ * @param string $topicName
+ * @param integer $partitionId
+ * @access public
+ * @return int
+ */
+ public function getProduceOffset($timeLine = self::LAST_OFFSET)
+ {
+ $topicName = $this->topicName;
+ $partitionId = $this->partitionId;
+
+ $requestData = array(
+ 'data' => array(
+ array(
+ 'topic_name' => $this->topicName,
+ 'partitions' => array(
+ array(
+ 'partition_id' => $this->partitionId,
+ 'time' => $timeLine,
+ 'max_offset' => 1,
+ ),
+ ),
+ ),
+ ),
+ );
+ $this->encoder->offsetRequest($requestData);
+ $result = $this->decoder->offsetResponse();
+ $this->client->freeStream($this->streamKey);
+
+ if (!isset($result[$topicName][$partitionId]['offset'])) {
+ if (isset($result[$topicName][$partitionId]['errCode'])) {
+ throw new \Kafka\Exception(\Kafka\Protocol\Decoder::getError($result[$topicName][$partitionId]['errCode']));
+ } else {
+ throw new \Kafka\Exception('get offset failed. topic name:' . $this->topicName . ' partitionId: ' . $this->partitionId);
+ }
+ }
+
+ return array_shift($result[$topicName][$partitionId]['offset']);
+ }
+
+ // }}}
+ // }}}
+}
diff --git a/vendor/nmred/kafka-php/src/Kafka/Produce.php b/vendor/nmred/kafka-php/src/Kafka/Produce.php
new file mode 100644
index 00000000..2b9e6cd3
--- /dev/null
+++ b/vendor/nmred/kafka-php/src/Kafka/Produce.php
@@ -0,0 +1,337 @@
+<?php
+/* vim: set expandtab tabstop=4 shiftwidth=4 softtabstop=4 foldmethod=marker: */
+// +---------------------------------------------------------------------------
+// | SWAN [ $_SWANBR_SLOGAN_$ ]
+// +---------------------------------------------------------------------------
+// | Copyright $_SWANBR_COPYRIGHT_$
+// +---------------------------------------------------------------------------
+// | Version $_SWANBR_VERSION_$
+// +---------------------------------------------------------------------------
+// | Licensed ( $_SWANBR_LICENSED_URL_$ )
+// +---------------------------------------------------------------------------
+// | $_SWANBR_WEB_DOMAIN_$
+// +---------------------------------------------------------------------------
+
+namespace Kafka;
+
+/**
++------------------------------------------------------------------------------
+* Kafka protocol since Kafka v0.8
++------------------------------------------------------------------------------
+*
+* @package
+* @version $_SWANBR_VERSION_$
+* @copyright Copyleft
+* @author $_SWANBR_AUTHOR_$
++------------------------------------------------------------------------------
+*/
+
+class Produce
+{
+ // {{{ consts
+ // }}}
+ // {{{ members
+
+ /**
+ * client
+ *
+ * @var mixed
+ * @access private
+ */
+ private $client = null;
+
+ /**
+ * send message options cache
+ *
+ * @var array
+ * @access private
+ */
+ private $payload = array();
+
+ /**
+ * default the server will not send any response
+ *
+ * @var float
+ * @access private
+ */
+ private $requiredAck = 0;
+
+ /**
+ * default timeout is 100ms
+ *
+ * @var float
+ * @access private
+ */
+ private $timeout = 100;
+
+ /**
+ * produce instance
+ *
+ * @var \Kafka\Produce
+ * @access private
+ */
+ private static $instance = null;
+
+ /**
+ * broker host list
+ *
+ * @var array
+ * @access private
+ */
+ private $hostList = array();
+
+ /**
+ * save broker connection
+ *
+ * @var array
+ * @access private
+ */
+ private $stream = array();
+
+ // }}}
+ // {{{ functions
+ // {{{ public function static getInstance()
+
+ /**
+ * set send messages
+ *
+ * @access public
+ * @return void
+ */
+ public static function getInstance($hostList, $timeout, $kafkaHostList = null)
+ {
+ if (is_null(self::$instance)) {
+ self::$instance = new self($hostList, $timeout, $kafkaHostList);
+ }
+
+ return self::$instance;
+ }
+
+ // }}}
+ // {{{ public function __construct()
+
+ /**
+ * __construct
+ *
+ * @access public
+ * @return void
+ */
+ public function __construct($hostList, $timeout = null, $kafkaHostList = null)
+ {
+ if ($hostList instanceof \Kafka\ClusterMetaData) {
+ $metadata = $hostList;
+ } elseif ( $kafkaHostList !== null ) {
+ $metadata = new \Kafka\MetaDataFromKafka($kafkaHostList);
+ } else {
+ $metadata = new \Kafka\ZooKeeper($hostList, $timeout);
+ }
+ $this->client = new \Kafka\Client($metadata);
+ }
+
+ // }}}
+ // {{{ public function setMessages()
+
+ /**
+ * set send messages
+ *
+ * @access public
+ * @return void
+ */
+ public function setMessages($topicName, $partitionId = 0, $messages = array())
+ {
+ if (isset($this->payload[$topicName][$partitionId])) {
+ $this->payload[$topicName][$partitionId] =
+ array_merge($this->payload[$topicName][$partitionId], $messages);
+ } else {
+ $this->payload[$topicName][$partitionId] = $messages;
+ }
+
+ return $this;
+ }
+
+ // }}}
+ // {{{ public function setRequireAck()
+
+ /**
+ * set request mode
+ * This field indicates how many acknowledgements the servers should receive
+ * before responding to the request. If it is 0 the server will not send any
+ * response (this is the only case where the server will not reply to a
+ * request). If it is 1, the server will wait the data is written to the
+ * local log before sending a response. If it is -1 the server will block
+ * until the message is committed by all in sync replicas before sending a
+ * response. For any number > 1 the server will block waiting for this
+ * number of acknowledgements to occur (but the server will never wait for
+ * more acknowledgements than there are in-sync replicas).
+ *
+ * @param int $ack
+ * @access public
+ * @return void
+ */
+ public function setRequireAck($ack = 0)
+ {
+ if ($ack >= -1) {
+ $this->requiredAck = (int) $ack;
+ }
+
+ return $this;
+ }
+
+ // }}}
+ // {{{ public function setTimeOut()
+
+ /**
+ * set request timeout
+ *
+ * @param int $timeout
+ * @access public
+ * @return void
+ */
+ public function setTimeOut($timeout = 100)
+ {
+ if ((int) $timeout) {
+ $this->timeout = (int) $timeout;
+ }
+ return $this;
+ }
+
+ // }}}
+ // {{{ public function send()
+
+ /**
+ * send message to broker
+ *
+ * @access public
+ * @return void
+ */
+ public function send()
+ {
+ $data = $this->_formatPayload();
+ if (empty($data)) {
+ return false;
+ }
+
+ $responseData = array();
+ foreach ($data as $host => $requestData) {
+ $stream = $this->client->getStream($host);
+ $conn = $stream['stream'];
+ $encoder = new \Kafka\Protocol\Encoder($conn);
+ $encoder->produceRequest($requestData);
+ if ((int) $this->requiredAck !== 0) { // get broker response
+ $decoder = new \Kafka\Protocol\Decoder($conn);
+ $response = $decoder->produceResponse();
+ foreach ($response as $topicName => $info) {
+ if (!isset($responseData[$topicName])) {
+ $responseData[$topicName] = $info;
+ } else {
+ $responseData[$topicName] = array_merge($info, $responseData[$topicName]);
+ }
+ }
+ }
+
+ $this->client->freeStream($stream['key']);
+ }
+
+ $this->payload = array();
+ return $responseData;
+ }
+
+ // }}}
+ // {{{ public function getClient()
+
+ /**
+ * get client object
+ *
+ * @access public
+ * @return void
+ */
+ public function getClient()
+ {
+ return $this->client;
+ }
+
+ /**
+ * passthru method to client for setting stream options
+ *
+ * @access public
+ * @param array $options
+ */
+ public function setStreamOptions($options = array())
+ {
+ $this->client->setStreamOptions($options);
+ }
+
+ // }}}
+ // {{{ public function getAvailablePartitions()
+
+ /**
+ * get available partition
+ *
+ * @access public
+ * @return array
+ */
+ public function getAvailablePartitions($topicName)
+ {
+ $topicDetail = $this->client->getTopicDetail($topicName);
+ if (is_array($topicDetail) && isset($topicDetail['partitions'])) {
+ $topicPartitiions = array_keys($topicDetail['partitions']);
+ } else {
+ $topicPartitiions = array();
+ }
+
+ return $topicPartitiions;
+ }
+
+ // }}}
+ // {{{ private function _formatPayload()
+
+ /**
+ * format payload array
+ *
+ * @access private
+ * @return array
+ */
+ private function _formatPayload()
+ {
+ if (empty($this->payload)) {
+ return array();
+ }
+
+ $data = array();
+ foreach ($this->payload as $topicName => $partitions) {
+ foreach ($partitions as $partitionId => $messages) {
+ $host = $this->client->getHostByPartition($topicName, $partitionId);
+ $data[$host][$topicName][$partitionId] = $messages;
+ }
+ }
+
+ $requestData = array();
+ foreach ($data as $host => $info) {
+ $topicData = array();
+ foreach ($info as $topicName => $partitions) {
+ $partitionData = array();
+ foreach ($partitions as $partitionId => $messages) {
+ $partitionData[] = array(
+ 'partition_id' => $partitionId,
+ 'messages' => $messages,
+ );
+ }
+ $topicData[] = array(
+ 'topic_name' => $topicName,
+ 'partitions' => $partitionData,
+ );
+ }
+
+ $requestData[$host] = array(
+ 'required_ack' => $this->requiredAck,
+ 'timeout' => $this->timeout,
+ 'data' => $topicData,
+ );
+ }
+
+ return $requestData;
+ }
+
+ // }}}
+ // }}}
+}
diff --git a/vendor/nmred/kafka-php/src/Kafka/Protocol/Decoder.php b/vendor/nmred/kafka-php/src/Kafka/Protocol/Decoder.php
new file mode 100644
index 00000000..f1e4b496
--- /dev/null
+++ b/vendor/nmred/kafka-php/src/Kafka/Protocol/Decoder.php
@@ -0,0 +1,430 @@
+<?php
+/* vim: set expandtab tabstop=4 shiftwidth=4 softtabstop=4 foldmethod=marker: */
+// +---------------------------------------------------------------------------
+// | SWAN [ $_SWANBR_SLOGAN_$ ]
+// +---------------------------------------------------------------------------
+// | Copyright $_SWANBR_COPYRIGHT_$
+// +---------------------------------------------------------------------------
+// | Version $_SWANBR_VERSION_$
+// +---------------------------------------------------------------------------
+// | Licensed ( $_SWANBR_LICENSED_URL_$ )
+// +---------------------------------------------------------------------------
+// | $_SWANBR_WEB_DOMAIN_$
+// +---------------------------------------------------------------------------
+
+namespace Kafka\Protocol;
+
+/**
++------------------------------------------------------------------------------
+* Kafka protocol since Kafka v0.8
++------------------------------------------------------------------------------
+*
+* @package
+* @version $_SWANBR_VERSION_$
+* @copyright Copyleft
+* @author $_SWANBR_AUTHOR_$
++------------------------------------------------------------------------------
+*/
+
+class Decoder extends Protocol
+{
+ // {{{ functions
+ // {{{ public function produceResponse()
+
+ /**
+ * decode produce response
+ *
+ * @param string $data
+ * @access public
+ * @return array
+ */
+ public function produceResponse()
+ {
+ $result = array();
+ $dataLen = self::unpack(self::BIT_B32, $this->stream->read(4, true));
+ $dataLen = array_shift($dataLen);
+ if (!$dataLen) {
+ throw new \Kafka\Exception\Protocol('produce response invalid.');
+ }
+ $data = $this->stream->read($dataLen, true);
+
+ // parse data struct
+ $offset = 4;
+ $topicCount = self::unpack(self::BIT_B32, substr($data, $offset, 4));
+ $topicCount = array_shift($topicCount);
+ $offset += 4;
+ for ($i = 0; $i < $topicCount; $i++) {
+ $topicLen = self::unpack(self::BIT_B16, substr($data, $offset, 2)); // int16 topic name length
+ $topicLen = isset($topicLen[1]) ? $topicLen[1] : 0;
+ $offset += 2;
+ $topicName = substr($data, $offset, $topicLen);
+ $offset += $topicLen;
+ $partitionCount = self::unpack(self::BIT_B32, substr($data, $offset, 4));
+ $partitionCount = isset($partitionCount[1]) ? $partitionCount[1] : 0;
+ $offset += 4;
+ $result[$topicName] = array();
+ for ($j = 0; $j < $partitionCount; $j++) {
+ $partitionId = self::unpack(self::BIT_B32, substr($data, $offset, 4));
+ $offset += 4;
+ $errCode = self::unpack(self::BIT_B16, substr($data, $offset, 2));
+ $offset += 2;
+ $partitionOffset = self::unpack(self::BIT_B64, substr($data, $offset, 8));
+ $offset += 8;
+ $result[$topicName][$partitionId[1]] = array(
+ 'errCode' => $errCode[1],
+ 'offset' => $partitionOffset
+ );
+ }
+ }
+
+ return $result;
+ }
+
+ // }}}
+ // {{{ public function fetchResponse()
+
+ /**
+ * decode fetch response
+ *
+ * @param string $data
+ * @access public
+ * @return Iterator
+ */
+ public function fetchResponse()
+ {
+ return new \Kafka\Protocol\Fetch\Topic($this->stream);
+ }
+
+ // }}}
+ // {{{ public function metadataResponse()
+
+ /**
+ * decode metadata response
+ *
+ * @param string $data
+ * @access public
+ * @return array
+ */
+ public function metadataResponse()
+ {
+ $result = array();
+ $broker = array();
+ $topic = array();
+ $dataLen = self::unpack(self::BIT_B32, $this->stream->read(4, true));
+ $dataLen = array_shift($dataLen);
+ if (!$dataLen) {
+ throw new \Kafka\Exception\Protocol('metaData response invalid.');
+ }
+ $data = $this->stream->read($dataLen, true);
+ $offset = 4;
+ $brokerCount = self::unpack(self::BIT_B32, substr($data, $offset, 4));
+ $offset += 4;
+ $brokerCount = isset($brokerCount[1]) ? $brokerCount[1] : 0;
+ for ($i = 0; $i < $brokerCount; $i++) {
+ $nodeId = self::unpack(self::BIT_B32, substr($data, $offset, 4));
+ $nodeId = $nodeId[1];
+ $offset += 4;
+ $hostNameLen = self::unpack(self::BIT_B16, substr($data, $offset, 2)); // int16 host name length
+ $hostNameLen = isset($hostNameLen[1]) ? $hostNameLen[1] : 0;
+ $offset += 2;
+ $hostName = substr($data, $offset, $hostNameLen);
+ $offset += $hostNameLen;
+ $port = self::unpack(self::BIT_B32, substr($data, $offset, 4));
+ $offset += 4;
+ $broker[$nodeId] = array(
+ 'host' => $hostName,
+ 'port' => $port[1],
+ );
+ }
+
+ $topicMetaCount = self::unpack(self::BIT_B32, substr($data, $offset, 4));
+ $offset += 4;
+ $topicMetaCount = isset($topicMetaCount[1]) ? $topicMetaCount[1] : 0;
+ for ($i = 0; $i < $topicMetaCount; $i++) {
+ $topicErrCode = self::unpack(self::BIT_B16, substr($data, $offset, 2));
+ $offset += 2;
+ $topicLen = self::unpack(self::BIT_B16, substr($data, $offset, 2));
+ $offset += 2;
+ $topicName = substr($data, $offset, $topicLen[1]);
+ $offset += $topicLen[1];
+ $partitionCount = self::unpack(self::BIT_B32, substr($data, $offset, 4));
+ $offset += 4;
+ $partitionCount = isset($partitionCount[1]) ? $partitionCount[1] : 0;
+ $topic[$topicName]['errCode'] = $topicErrCode[1];
+ $partitions = array();
+ for ($j = 0; $j < $partitionCount; $j++) {
+ $partitionErrCode = self::unpack(self::BIT_B16, substr($data, $offset, 2));
+ $offset += 2;
+ $partitionId = self::unpack(self::BIT_B32, substr($data, $offset, 4));
+ $partitionId = isset($partitionId[1]) ? $partitionId[1] : 0;
+ $offset += 4;
+ $leaderId = self::unpack(self::BIT_B32, substr($data, $offset, 4));
+ $offset += 4;
+ $repliasCount = self::unpack(self::BIT_B32, substr($data, $offset, 4));
+ $offset += 4;
+ $repliasCount = isset($repliasCount[1]) ? $repliasCount[1] : 0;
+ $replias = array();
+ for ($z = 0; $z < $repliasCount; $z++) {
+ $repliaId = self::unpack(self::BIT_B32, substr($data, $offset, 4));
+ $offset += 4;
+ $replias[] = $repliaId[1];
+ }
+ $isrCount = self::unpack(self::BIT_B32, substr($data, $offset, 4));
+ $offset += 4;
+ $isrCount = isset($isrCount[1]) ? $isrCount[1] : 0;
+ $isrs = array();
+ for ($z = 0; $z < $isrCount; $z++) {
+ $isrId = self::unpack(self::BIT_B32, substr($data, $offset, 4));
+ $offset += 4;
+ $isrs[] = $isrId[1];
+ }
+
+ $partitions[$partitionId] = array(
+ 'errCode' => $partitionErrCode[1],
+ 'leader' => $leaderId[1],
+ 'replicas' => $replias,
+ 'isr' => $isrs,
+ );
+ }
+ $topic[$topicName]['partitions'] = $partitions;
+ }
+
+ $result = array(
+ 'brokers' => $broker,
+ 'topics' => $topic,
+ );
+ return $result;
+ }
+
+ // }}}
+ // {{{ public function offsetResponse()
+
+ /**
+ * decode offset response
+ *
+ * @param string $data
+ * @access public
+ * @return array
+ */
+ public function offsetResponse()
+ {
+ $result = array();
+ $dataLen = self::unpack(self::BIT_B32, $this->stream->read(4, true));
+ $dataLen = array_shift($dataLen);
+ if (!$dataLen) {
+ throw new \Kafka\Exception\Protocol('offset response invalid.');
+ }
+ $data = $this->stream->read($dataLen, true);
+ $offset = 4;
+ $topicCount = self::unpack(self::BIT_B32, substr($data, $offset, 4));
+ $offset += 4;
+ $topicCount = array_shift($topicCount);
+ for ($i = 0; $i < $topicCount; $i++) {
+ $topicLen = self::unpack(self::BIT_B16, substr($data, $offset, 2)); // int16 topic name length
+ $topicLen = isset($topicLen[1]) ? $topicLen[1] : 0;
+ $offset += 2;
+ $topicName = substr($data, $offset, $topicLen);
+ $offset += $topicLen;
+ $partitionCount = self::unpack(self::BIT_B32, substr($data, $offset, 4));
+ $partitionCount = isset($partitionCount[1]) ? $partitionCount[1] : 0;
+ $offset += 4;
+ $result[$topicName] = array();
+ for ($j = 0; $j < $partitionCount; $j++) {
+ $partitionId = self::unpack(self::BIT_B32, substr($data, $offset, 4));
+ $offset += 4;
+ $errCode = self::unpack(self::BIT_B16, substr($data, $offset, 2));
+ $offset += 2;
+ $offsetCount = self::unpack(self::BIT_B32, substr($data, $offset, 4));
+ $offset += 4;
+ $offsetCount = array_shift($offsetCount);
+ $offsetArr = array();
+ for ($z = 0; $z < $offsetCount; $z++) {
+ $offsetArr[] = self::unpack(self::BIT_B64, substr($data, $offset, 8));
+ $offset += 8;
+ }
+ $result[$topicName][$partitionId[1]] = array(
+ 'errCode' => $errCode[1],
+ 'offset' => $offsetArr
+ );
+ }
+ }
+ return $result;
+ }
+
+ // }}}
+ // {{{ public function commitOffsetResponse()
+
+ /**
+ * decode commit offset response
+ *
+ * @param string $data
+ * @access public
+ * @return array
+ */
+ public function commitOffsetResponse()
+ {
+ $result = array();
+ $dataLen = self::unpack(self::BIT_B32, $this->stream->read(4, true));
+ $dataLen = array_shift($dataLen);
+ if (!$dataLen) {
+ throw new \Kafka\Exception\Protocol('commit offset response invalid.');
+ }
+ $data = $this->stream->read($dataLen, true);
+ $offset = 4;
+ $topicCount = self::unpack(self::BIT_B32, substr($data, $offset, 4));
+ $offset += 4;
+ $topicCount = array_shift($topicCount);
+ for ($i = 0; $i < $topicCount; $i++) {
+ $topicLen = self::unpack(self::BIT_B16, substr($data, $offset, 2)); // int16 topic name length
+ $topicLen = isset($topicLen[1]) ? $topicLen[1] : 0;
+ $offset += 2;
+ $topicName = substr($data, $offset, $topicLen);
+ $offset += $topicLen;
+ $partitionCount = self::unpack(self::BIT_B32, substr($data, $offset, 4));
+ $partitionCount = isset($partitionCount[1]) ? $partitionCount[1] : 0;
+ $offset += 4;
+ $result[$topicName] = array();
+ for ($j = 0; $j < $partitionCount; $j++) {
+ $partitionId = self::unpack(self::BIT_B32, substr($data, $offset, 4));
+ $offset += 4;
+ $errCode = self::unpack(self::BIT_B16, substr($data, $offset, 2));
+ $offset += 2;
+ $result[$topicName][$partitionId[1]] = array(
+ 'errCode' => $errCode[1],
+ );
+ }
+ }
+ return $result;
+ }
+
+ // }}}
+ // {{{ public function fetchOffsetResponse()
+
+ /**
+ * decode fetch offset response
+ *
+ * @param string $data
+ * @access public
+ * @return array
+ */
+ public function fetchOffsetResponse()
+ {
+ $result = array();
+ $dataLen = self::unpack(self::BIT_B32, $this->stream->read(4, true));
+ $dataLen = array_shift($dataLen);
+ if (!$dataLen) {
+ throw new \Kafka\Exception\Protocol('fetch offset response invalid.');
+ }
+ $data = $this->stream->read($dataLen, true);
+ $offset = 4;
+ $topicCount = self::unpack(self::BIT_B32, substr($data, $offset, 4));
+ $offset += 4;
+ $topicCount = array_shift($topicCount);
+ for ($i = 0; $i < $topicCount; $i++) {
+ $topicLen = self::unpack(self::BIT_B16, substr($data, $offset, 2)); // int16 topic name length
+ $topicLen = isset($topicLen[1]) ? $topicLen[1] : 0;
+ $offset += 2;
+ $topicName = substr($data, $offset, $topicLen);
+ $offset += $topicLen;
+ $partitionCount = self::unpack(self::BIT_B32, substr($data, $offset, 4));
+ $partitionCount = isset($partitionCount[1]) ? $partitionCount[1] : 0;
+ $offset += 4;
+ $result[$topicName] = array();
+ for ($j = 0; $j < $partitionCount; $j++) {
+ $partitionId = self::unpack(self::BIT_B32, substr($data, $offset, 4));
+ $offset += 4;
+ $partitionOffset = self::unpack(self::BIT_B64, substr($data, $offset, 8));
+ $offset += 8;
+ $metaLen = self::unpack(self::BIT_B16, substr($data, $offset, 2));
+ $metaLen = array_shift($metaLen);
+ $offset += 2;
+ $metaData = '';
+ if ($metaLen) {
+ $metaData = substr($data, $offset, $metaLen);
+ $offset += $metaLen;
+ }
+ $errCode = self::unpack(self::BIT_B16, substr($data, $offset, 2));
+ $offset += 2;
+ $result[$topicName][$partitionId[1]] = array(
+ 'offset' => $partitionOffset,
+ 'metadata' => $metaData,
+ 'errCode' => $errCode[1],
+ );
+ }
+ }
+ return $result;
+ }
+
+ // }}}
+ // {{{ public static function getError()
+
+ /**
+ * get error
+ *
+ * @param integer $errCode
+ * @static
+ * @access public
+ * @return string
+ */
+ public static function getError($errCode)
+ {
+ $error = '';
+ switch($errCode) {
+ case 0:
+ $error = 'No error--it worked!';
+ break;
+ case -1:
+ $error = 'An unexpected server error';
+ break;
+ case 1:
+ $error = 'The requested offset is outside the range of offsets maintained by the server for the given topic/partition.';
+ break;
+ case 2:
+ $error = 'This indicates that a message contents does not match its CRC';
+ break;
+ case 3:
+ $error = 'This request is for a topic or partition that does not exist on this broker.';
+ break;
+ case 4:
+ $error = 'The message has a negative size';
+ break;
+ case 5:
+ $error = 'This error is thrown if we are in the middle of a leadership election and there is currently no leader for this partition and hence it is unavailable for writes';
+ break;
+ case 6:
+ $error = 'This error is thrown if the client attempts to send messages to a replica that is not the leader for some partition. It indicates that the clients metadata is out of date.';
+ break;
+ case 7:
+ $error = 'This error is thrown if the request exceeds the user-specified time limit in the request.';
+ break;
+ case 8:
+ $error = 'This is not a client facing error and is used only internally by intra-cluster broker communication.';
+ break;
+ case 10:
+ $error = 'The server has a configurable maximum message size to avoid unbounded memory allocation. This error is thrown if the client attempt to produce a message larger than this maximum.';
+ break;
+ case 11:
+ $error = 'Internal error code for broker-to-broker communication.';
+ break;
+ case 12:
+ $error = 'If you specify a string larger than configured maximum for offset metadata';
+ break;
+ case 14:
+ $error = 'The broker returns this error code for an offset fetch request if it is still loading offsets (after a leader change for that offsets topic partition).';
+ break;
+ case 15:
+ $error = 'The broker returns this error code for consumer metadata requests or offset commit requests if the offsets topic has not yet been created.';
+ break;
+ case 16:
+ $error = 'The broker returns this error code if it receives an offset fetch or commit request for a consumer group that it is not a coordinator for.';
+ break;
+ default:
+ $error = 'Unknown error';
+ }
+
+ return $error;
+ }
+
+ // }}}
+ // }}}
+}
diff --git a/vendor/nmred/kafka-php/src/Kafka/Protocol/Encoder.php b/vendor/nmred/kafka-php/src/Kafka/Protocol/Encoder.php
new file mode 100644
index 00000000..7d36e10f
--- /dev/null
+++ b/vendor/nmred/kafka-php/src/Kafka/Protocol/Encoder.php
@@ -0,0 +1,652 @@
+<?php
+/* vim: set expandtab tabstop=4 shiftwidth=4 softtabstop=4 foldmethod=marker: */
+// +---------------------------------------------------------------------------
+// | SWAN [ $_SWANBR_SLOGAN_$ ]
+// +---------------------------------------------------------------------------
+// | Copyright $_SWANBR_COPYRIGHT_$
+// +---------------------------------------------------------------------------
+// | Version $_SWANBR_VERSION_$
+// +---------------------------------------------------------------------------
+// | Licensed ( $_SWANBR_LICENSED_URL_$ )
+// +---------------------------------------------------------------------------
+// | $_SWANBR_WEB_DOMAIN_$
+// +---------------------------------------------------------------------------
+
+namespace Kafka\Protocol;
+
+/**
++------------------------------------------------------------------------------
+* Kafka protocol since Kafka v0.8
++------------------------------------------------------------------------------
+*
+* @package
+* @version $_SWANBR_VERSION_$
+* @copyright Copyleft
+* @author $_SWANBR_AUTHOR_$
++------------------------------------------------------------------------------
+*/
+
+class Encoder extends Protocol
+{
+ // {{{ functions
+ // {{{ public function produceRequest()
+
+ /**
+ * produce request
+ *
+ * @param array $payloads
+ * @static
+ * @access public
+ * @return void
+ */
+ public function produceRequest($payloads, $compression = self::COMPRESSION_NONE)
+ {
+ if (!isset($payloads['data'])) {
+ throw new \Kafka\Exception\Protocol('given procude data invalid. `data` is undefined.');
+ }
+
+ if (!isset($payloads['required_ack'])) {
+ // default server will not send any response
+ // (this is the only case where the server will not reply to a request)
+ $payloads['required_ack'] = 0;
+ }
+
+ if (!isset($payloads['timeout'])) {
+ $payloads['timeout'] = 100; // default timeout 100ms
+ }
+
+ $header = self::requestHeader('kafka-php', 0, self::PRODUCE_REQUEST);
+ $data = self::pack(self::BIT_B16, $payloads['required_ack']);
+ $data .= self::pack(self::BIT_B32, $payloads['timeout']);
+ $data .= self::encodeArray($payloads['data'], array(__CLASS__, '_encodeProcudeTopic'), $compression);
+ $data = self::encodeString($header . $data, self::PACK_INT32);
+
+ return $this->stream->write($data);
+ }
+
+ // }}}
+ // {{{ public function metadataRequest()
+
+ /**
+ * build metadata request protocol
+ *
+ * @param array $topics
+ * @access public
+ * @return string
+ */
+ public function metadataRequest($topics)
+ {
+ if (!is_array($topics)) {
+ $topics = array($topics);
+ }
+
+ foreach ($topics as $topic) {
+ if (!is_string($topic)) {
+ throw new \Kafka\Exception\Protocol('request metadata topic array have invalid value. ');
+ }
+ }
+
+ $header = self::requestHeader('kafka-php', 0, self::METADATA_REQUEST);
+ $data = self::encodeArray($topics, array(__CLASS__, 'encodeString'), self::PACK_INT16);
+ $data = self::encodeString($header . $data, self::PACK_INT32);
+
+ return $this->stream->write($data);
+ }
+
+ // }}}
+ // {{{ public function fetchRequest()
+
+ /**
+ * build fetch request
+ *
+ * @param array $payloads
+ * @access public
+ * @return string
+ */
+ public function fetchRequest($payloads)
+ {
+ if (!isset($payloads['data'])) {
+ throw new \Kafka\Exception\Protocol('given fetch kafka data invalid. `data` is undefined.');
+ }
+
+ if (!isset($payloads['replica_id'])) {
+ $payloads['replica_id'] = -1;
+ }
+
+ if (!isset($payloads['max_wait_time'])) {
+ $payloads['max_wait_time'] = 100; // default timeout 100ms
+ }
+
+ if (!isset($payloads['min_bytes'])) {
+ $payloads['min_bytes'] = 64 * 1024; // 64k
+ }
+
+ $header = self::requestHeader('kafka-php', 0, self::FETCH_REQUEST);
+ $data = self::pack(self::BIT_B32, $payloads['replica_id']);
+ $data .= self::pack(self::BIT_B32, $payloads['max_wait_time']);
+ $data .= self::pack(self::BIT_B32, $payloads['min_bytes']);
+ $data .= self::encodeArray($payloads['data'], array(__CLASS__, '_encodeFetchTopic'));
+ $data = self::encodeString($header . $data, self::PACK_INT32);
+
+ return $this->stream->write($data);
+ }
+
+ // }}}
+ // {{{ public function offsetRequest()
+
+ /**
+ * build offset request
+ *
+ * @param array $payloads
+ * @access public
+ * @return string
+ */
+ public function offsetRequest($payloads)
+ {
+ if (!isset($payloads['data'])) {
+ throw new \Kafka\Exception\Protocol('given offset data invalid. `data` is undefined.');
+ }
+
+ if (!isset($payloads['replica_id'])) {
+ $payloads['replica_id'] = -1;
+ }
+
+ $header = self::requestHeader('kafka-php', 0, self::OFFSET_REQUEST);
+ $data = self::pack(self::BIT_B32, $payloads['replica_id']);
+ $data .= self::encodeArray($payloads['data'], array(__CLASS__, '_encodeOffsetTopic'));
+ $data = self::encodeString($header . $data, self::PACK_INT32);
+
+ return $this->stream->write($data);
+ }
+
+ // }}}
+ // {{{ public function commitOffsetRequest()
+
+ /**
+ * build consumer commit offset request
+ *
+ * @param array $payloads
+ * @access public
+ * @return string
+ */
+ public function commitOffsetRequest($payloads)
+ {
+ if (!isset($payloads['data'])) {
+ throw new \Kafka\Exception\Protocol('given commit offset data invalid. `data` is undefined.');
+ }
+
+ if (!isset($payloads['group_id'])) {
+ throw new \Kafka\Exception\Protocol('given commit offset data invalid. `group_id` is undefined.');
+ }
+
+ $header = self::requestHeader('kafka-php', 0, self::OFFSET_COMMIT_REQUEST);
+ $data = self::encodeString($payloads['group_id'], self::PACK_INT16);
+ $data .= self::encodeArray($payloads['data'], array(__CLASS__, '_encodeCommitOffset'));
+ $data = self::encodeString($header . $data, self::PACK_INT32);
+
+ return $this->stream->write($data);
+ }
+
+ // }}}
+ // {{{ public function fetchOffsetRequest()
+
+ /**
+ * build consumer fetch offset request
+ *
+ * @param array $payloads
+ * @access public
+ * @return string
+ */
+ public function fetchOffsetRequest($payloads)
+ {
+ if (!isset($payloads['data'])) {
+ throw new \Kafka\Exception\Protocol('given fetch offset data invalid. `data` is undefined.');
+ }
+
+ if (!isset($payloads['group_id'])) {
+ throw new \Kafka\Exception\Protocol('given fetch offset data invalid. `group_id` is undefined.');
+ }
+
+ $header = self::requestHeader('kafka-php', 0, self::OFFSET_FETCH_REQUEST);
+ $data = self::encodeString($payloads['group_id'], self::PACK_INT16);
+ $data .= self::encodeArray($payloads['data'], array(__CLASS__, '_encodeFetchOffset'));
+ $data = self::encodeString($header . $data, self::PACK_INT32);
+
+ return $this->stream->write($data);
+ }
+
+ // }}}
+ // {{{ public static function encodeString()
+
+ /**
+ * encode pack string type
+ *
+ * @param string $string
+ * @param int $bytes self::PACK_INT32: int32 big endian order. self::PACK_INT16: int16 big endian order.
+ * @static
+ * @access public
+ * @return string
+ */
+ public static function encodeString($string, $bytes, $compression = self::COMPRESSION_NONE)
+ {
+ $packLen = ($bytes == self::PACK_INT32) ? self::BIT_B32 : self::BIT_B16;
+ switch ($compression) {
+ case self::COMPRESSION_NONE:
+ break;
+ case self::COMPRESSION_GZIP:
+ $string = gzencode($string);
+ break;
+ case self::COMPRESSION_SNAPPY:
+ throw new \Kafka\Exception\NotSupported('SNAPPY compression not yet implemented');
+ default:
+ throw new \Kafka\Exception\NotSupported('Unknown compression flag: ' . $compression);
+ }
+ return self::pack($packLen, strlen($string)) . $string;
+ }
+
+ // }}}
+ // {{{ public static function encodeArray()
+
+ /**
+ * encode key array
+ *
+ * @param array $array
+ * @param Callable $func
+ * @static
+ * @access public
+ * @return string
+ */
+ public static function encodeArray(array $array, $func, $options = null)
+ {
+ if (!is_callable($func, false)) {
+ throw new \Kafka\Exception\Protocol('Encode array failed, given function is not callable.');
+ }
+
+ $arrayCount = count($array);
+
+ $body = '';
+ foreach ($array as $value) {
+ if (!is_null($options)) {
+ $body .= call_user_func($func, $value, $options);
+ } else {
+ $body .= call_user_func($func, $value);
+ }
+ }
+
+ return self::pack(self::BIT_B32, $arrayCount) . $body;
+ }
+
+ // }}}
+ // {{{ public static function encodeMessageSet()
+
+ /**
+ * encode message set
+ * N.B., MessageSets are not preceded by an int32 like other array elements
+ * in the protocol.
+ *
+ * @param array $messages
+ * @static
+ * @access public
+ * @return string
+ */
+ public static function encodeMessageSet($messages, $compression = self::COMPRESSION_NONE)
+ {
+ if (!is_array($messages)) {
+ $messages = array($messages);
+ }
+
+ $data = '';
+ foreach ($messages as $message) {
+ $tmpMessage = self::_encodeMessage($message, $compression);
+
+ // int64 -- message offset Message
+ $data .= self::pack(self::BIT_B64, 0) . self::encodeString($tmpMessage, self::PACK_INT32);
+ }
+ return $data;
+ }
+
+ // }}}
+ // {{{ public static function requestHeader()
+
+ /**
+ * get request header
+ *
+ * @param string $clientId
+ * @param integer $correlationId
+ * @param integer $apiKey
+ * @static
+ * @access public
+ * @return void
+ */
+ public static function requestHeader($clientId, $correlationId, $apiKey)
+ {
+ // int16 -- apiKey int16 -- apiVersion int32 correlationId
+ $binData = self::pack(self::BIT_B16, $apiKey);
+ $binData .= self::pack(self::BIT_B16, self::API_VERSION);
+ $binData .= self::pack(self::BIT_B32, $correlationId);
+
+ // concat client id
+ $binData .= self::encodeString($clientId, self::PACK_INT16);
+
+ return $binData;
+ }
+
+ // }}}
+ // {{{ protected static function _encodeMessage()
+
+ /**
+ * encode signal message
+ *
+ * @param string $message
+ * @static
+ * @access protected
+ * @return string
+ */
+ protected static function _encodeMessage($message, $compression = self::COMPRESSION_NONE)
+ {
+ // int8 -- magic int8 -- attribute
+ $data = self::pack(self::BIT_B8, self::MESSAGE_MAGIC);
+ $data .= self::pack(self::BIT_B8, $compression);
+
+ // message key
+ $data .= self::encodeString('', self::PACK_INT32);
+
+ // message value
+ $data .= self::encodeString($message, self::PACK_INT32, $compression);
+
+ $crc = crc32($data);
+
+ // int32 -- crc code string data
+ $message = self::pack(self::BIT_B32, $crc) . $data;
+
+ return $message;
+ }
+
+ // }}}
+ // {{{ protected static function _encodeProcudePartion()
+
+ /**
+ * encode signal part
+ *
+ * @param partions
+ * @static
+ * @access protected
+ * @return string
+ */
+ protected static function _encodeProcudePartion($values, $compression)
+ {
+ if (!isset($values['partition_id'])) {
+ throw new \Kafka\Exception\Protocol('given produce data invalid. `partition_id` is undefined.');
+ }
+
+ if (!isset($values['messages']) || empty($values['messages'])) {
+ throw new \Kafka\Exception\Protocol('given produce data invalid. `messages` is undefined.');
+ }
+
+ $data = self::pack(self::BIT_B32, $values['partition_id']);
+ $data .= self::encodeString(self::encodeMessageSet($values['messages'], $compression), self::PACK_INT32);
+
+ return $data;
+ }
+
+ // }}}
+ // {{{ protected static function _encodeProcudeTopic()
+
+ /**
+ * encode signal topic
+ *
+ * @param partions
+ * @static
+ * @access protected
+ * @return string
+ */
+ protected static function _encodeProcudeTopic($values, $compression)
+ {
+ if (!isset($values['topic_name'])) {
+ throw new \Kafka\Exception\Protocol('given produce data invalid. `topic_name` is undefined.');
+ }
+
+ if (!isset($values['partitions']) || empty($values['partitions'])) {
+ throw new \Kafka\Exception\Protocol('given produce data invalid. `partitions` is undefined.');
+ }
+
+ $topic = self::encodeString($values['topic_name'], self::PACK_INT16);
+ $partitions = self::encodeArray($values['partitions'], array(__CLASS__, '_encodeProcudePartion'), $compression);
+
+ return $topic . $partitions;
+ }
+
+ // }}}
+ // {{{ protected static function _encodeFetchPartion()
+
+ /**
+ * encode signal part
+ *
+ * @param partions
+ * @static
+ * @access protected
+ * @return string
+ */
+ protected static function _encodeFetchPartion($values)
+ {
+ if (!isset($values['partition_id'])) {
+ throw new \Kafka\Exception\Protocol('given fetch data invalid. `partition_id` is undefined.');
+ }
+
+ if (!isset($values['offset'])) {
+ $values['offset'] = 0;
+ }
+
+ if (!isset($values['max_bytes'])) {
+ $values['max_bytes'] = 100 * 1024 * 1024;
+ }
+
+ $data = self::pack(self::BIT_B32, $values['partition_id']);
+ $data .= self::pack(self::BIT_B64, $values['offset']);
+ $data .= self::pack(self::BIT_B32, $values['max_bytes']);
+
+ return $data;
+ }
+
+ // }}}
+ // {{{ protected static function _encodeFetchTopic()
+
+ /**
+ * encode signal topic
+ *
+ * @param partions
+ * @static
+ * @access protected
+ * @return string
+ */
+ protected static function _encodeFetchTopic($values)
+ {
+ if (!isset($values['topic_name'])) {
+ throw new \Kafka\Exception\Protocol('given fetch data invalid. `topic_name` is undefined.');
+ }
+
+ if (!isset($values['partitions']) || empty($values['partitions'])) {
+ throw new \Kafka\Exception\Protocol('given fetch data invalid. `partitions` is undefined.');
+ }
+
+ $topic = self::encodeString($values['topic_name'], self::PACK_INT16);
+ $partitions = self::encodeArray($values['partitions'], array(__CLASS__, '_encodeFetchPartion'));
+
+ return $topic . $partitions;
+ }
+
+ // }}}
+ // {{{ protected static function _encodeOffsetPartion()
+
+ /**
+ * encode signal part
+ *
+ * @param partions
+ * @static
+ * @access protected
+ * @return string
+ */
+ protected static function _encodeOffsetPartion($values)
+ {
+ if (!isset($values['partition_id'])) {
+ throw new \Kafka\Exception\Protocol('given offset data invalid. `partition_id` is undefined.');
+ }
+
+ if (!isset($values['time'])) {
+ $values['time'] = -1; // -1
+ }
+
+ if (!isset($values['max_offset'])) {
+ $values['max_offset'] = 100000;
+ }
+
+ $data = self::pack(self::BIT_B32, $values['partition_id']);
+ $data .= self::pack(self::BIT_B64, $values['time']);
+ $data .= self::pack(self::BIT_B32, $values['max_offset']);
+
+ return $data;
+ }
+
+ // }}}
+ // {{{ protected static function _encodeOffsetTopic()
+
+ /**
+ * encode signal topic
+ *
+ * @param partions
+ * @static
+ * @access protected
+ * @return string
+ */
+ protected static function _encodeOffsetTopic($values)
+ {
+ if (!isset($values['topic_name'])) {
+ throw new \Kafka\Exception\Protocol('given offset data invalid. `topic_name` is undefined.');
+ }
+
+ if (!isset($values['partitions']) || empty($values['partitions'])) {
+ throw new \Kafka\Exception\Protocol('given offset data invalid. `partitions` is undefined.');
+ }
+
+ $topic = self::encodeString($values['topic_name'], self::PACK_INT16);
+ $partitions = self::encodeArray($values['partitions'], array(__CLASS__, '_encodeOffsetPartion'));
+
+ return $topic . $partitions;
+ }
+
+ // }}}
+ // {{{ protected static function _encodeCommitOffsetPartion()
+
+ /**
+ * encode signal part
+ *
+ * @param partions
+ * @static
+ * @access protected
+ * @return string
+ */
+ protected static function _encodeCommitOffsetPartion($values)
+ {
+ if (!isset($values['partition_id'])) {
+ throw new \Kafka\Exception\Protocol('given commit offset data invalid. `partition_id` is undefined.');
+ }
+
+ if (!isset($values['offset'])) {
+ throw new \Kafka\Exception\Protocol('given commit offset data invalid. `offset` is undefined.');
+ }
+
+ if (!isset($values['time'])) {
+ $values['time'] = -1;
+ }
+
+ if (!isset($values['metadata'])) {
+ $values['metadata'] = 'm';
+ }
+
+ $data = self::pack(self::BIT_B32, $values['partition_id']);
+ $data .= self::pack(self::BIT_B64, $values['offset']);
+ $data .= self::pack(self::BIT_B64, $values['time']);
+ $data .= self::encodeString($values['metadata'], self::PACK_INT16);
+
+ return $data;
+ }
+
+ // }}}
+ // {{{ protected static function _encodeCommitOffset()
+
+ /**
+ * encode signal topic
+ *
+ * @param partions
+ * @static
+ * @access protected
+ * @return string
+ */
+ protected static function _encodeCommitOffset($values)
+ {
+ if (!isset($values['topic_name'])) {
+ throw new \Kafka\Exception\Protocol('given commit offset data invalid. `topic_name` is undefined.');
+ }
+
+ if (!isset($values['partitions']) || empty($values['partitions'])) {
+ throw new \Kafka\Exception\Protocol('given commit offset data invalid. `partitions` is undefined.');
+ }
+
+ $topic = self::encodeString($values['topic_name'], self::PACK_INT16);
+ $partitions = self::encodeArray($values['partitions'], array(__CLASS__, '_encodeCommitOffsetPartion'));
+
+ return $topic . $partitions;
+ }
+
+ // }}}
+ // {{{ protected static function _encodeFetchOffsetPartion()
+
+ /**
+ * encode signal part
+ *
+ * @param partions
+ * @static
+ * @access protected
+ * @return string
+ */
+ protected static function _encodeFetchOffsetPartion($values)
+ {
+ if (!isset($values['partition_id'])) {
+ throw new \Kafka\Exception\Protocol('given fetch offset data invalid. `partition_id` is undefined.');
+ }
+
+ $data = self::pack(self::BIT_B32, $values['partition_id']);
+
+ return $data;
+ }
+
+ // }}}
+ // {{{ protected static function _encodeFetchOffset()
+
+ /**
+ * encode signal topic
+ *
+ * @param partions
+ * @static
+ * @access protected
+ * @return string
+ */
+ protected static function _encodeFetchOffset($values)
+ {
+ if (!isset($values['topic_name'])) {
+ throw new \Kafka\Exception\Protocol('given fetch offset data invalid. `topic_name` is undefined.');
+ }
+
+ if (!isset($values['partitions']) || empty($values['partitions'])) {
+ throw new \Kafka\Exception\Protocol('given fetch offset data invalid. `partitions` is undefined.');
+ }
+
+ $topic = self::encodeString($values['topic_name'], self::PACK_INT16);
+ $partitions = self::encodeArray($values['partitions'], array(__CLASS__, '_encodeFetchOffsetPartion'));
+
+ return $topic . $partitions;
+ }
+
+ // }}}
+ // }}}
+}
diff --git a/vendor/nmred/kafka-php/src/Kafka/Protocol/Fetch/Helper/CommitOffset.php b/vendor/nmred/kafka-php/src/Kafka/Protocol/Fetch/Helper/CommitOffset.php
new file mode 100644
index 00000000..8424cf78
--- /dev/null
+++ b/vendor/nmred/kafka-php/src/Kafka/Protocol/Fetch/Helper/CommitOffset.php
@@ -0,0 +1,119 @@
+<?php
+/* vim: set expandtab tabstop=4 shiftwidth=4 softtabstop=4 foldmethod=marker: */
+// +---------------------------------------------------------------------------
+// | SWAN [ $_SWANBR_SLOGAN_$ ]
+// +---------------------------------------------------------------------------
+// | Copyright $_SWANBR_COPYRIGHT_$
+// +---------------------------------------------------------------------------
+// | Version $_SWANBR_VERSION_$
+// +---------------------------------------------------------------------------
+// | Licensed ( $_SWANBR_LICENSED_URL_$ )
+// +---------------------------------------------------------------------------
+// | $_SWANBR_WEB_DOMAIN_$
+// +---------------------------------------------------------------------------
+
+namespace Kafka\Protocol\Fetch\Helper;
+
+/**
++------------------------------------------------------------------------------
+* Kafka protocol since Kafka v0.8
++------------------------------------------------------------------------------
+*
+* @package
+* @version $_SWANBR_VERSION_$
+* @copyright Copyleft
+* @author $_SWANBR_AUTHOR_$
++------------------------------------------------------------------------------
+*/
+
+class CommitOffset extends HelperAbstract
+{
+ // {{{ members
+
+ /**
+ * consumer group
+ *
+ * @var string
+ * @access protected
+ */
+ protected $group = '';
+
+ // }}}
+ // {{{ functions
+ // {{{ public function __construct()
+
+ /**
+ * __construct
+ *
+ * @access public
+ * @return void
+ */
+ public function __construct($client)
+ {
+ $this->client = $client;
+ }
+
+ // }}}
+ // {{{ public function setGroup()
+
+ /**
+ * set consumer group
+ *
+ * @access public
+ * @return void
+ */
+ public function setGroup($group)
+ {
+ $this->group = $group;
+ }
+
+ // }}}
+ // {{{ public function onStreamEof()
+
+ /**
+ * on stream eof call
+ *
+ * @param string $streamKey
+ * @access public
+ * @return void
+ */
+ public function onStreamEof($streamKey)
+ {
+ }
+
+ // }}}
+ // {{{ public function onTopicEof()
+
+ /**
+ * on topic eof call
+ *
+ * @param string $topicName
+ * @access public
+ * @return void
+ */
+ public function onTopicEof($topicName)
+ {
+ }
+
+ // }}}
+ // {{{ public function onPartitionEof()
+
+ /**
+ * on partition eof call
+ *
+ * @param \Kafka\Protocol\Fetch\Partition $partition
+ * @access public
+ * @return void
+ */
+ public function onPartitionEof($partition)
+ {
+ $partitionId = $partition->key();
+ $topicName = $partition->getTopicName();
+ $offset = $partition->getMessageOffset();
+ $offsetObject = new \Kafka\Offset($this->client, $this->group, $topicName, $partitionId);
+ $offsetObject->setOffset($offset);
+ }
+
+ // }}}
+ // }}}
+}
diff --git a/vendor/nmred/kafka-php/src/Kafka/Protocol/Fetch/Helper/Consumer.php b/vendor/nmred/kafka-php/src/Kafka/Protocol/Fetch/Helper/Consumer.php
new file mode 100644
index 00000000..acf0223e
--- /dev/null
+++ b/vendor/nmred/kafka-php/src/Kafka/Protocol/Fetch/Helper/Consumer.php
@@ -0,0 +1,39 @@
+<?php
+namespace Kafka\Protocol\Fetch\Helper;
+/**
+ * Description of Consumer
+ *
+ * @author daniel
+ */
+class Consumer extends HelperAbstract
+{
+ protected $consumer;
+
+ protected $offsetStrategy;
+
+
+ public function __construct(\Kafka\Consumer $consumer)
+ {
+ $this->consumer = $consumer;
+ }
+
+
+ public function onPartitionEof($partition)
+ {
+ $partitionId = $partition->key();
+ $topicName = $partition->getTopicName();
+ $offset = $partition->getMessageOffset();
+ $this->consumer->setFromOffset(true);
+ $this->consumer->setPartition($topicName, $partitionId, ($offset +1));
+ }
+
+ public function onStreamEof($streamKey)
+ {
+
+ }
+
+ public function onTopicEof($topicName)
+ {
+
+ }
+}
diff --git a/vendor/nmred/kafka-php/src/Kafka/Protocol/Fetch/Helper/FreeStream.php b/vendor/nmred/kafka-php/src/Kafka/Protocol/Fetch/Helper/FreeStream.php
new file mode 100644
index 00000000..bba38dd6
--- /dev/null
+++ b/vendor/nmred/kafka-php/src/Kafka/Protocol/Fetch/Helper/FreeStream.php
@@ -0,0 +1,117 @@
+<?php
+/* vim: set expandtab tabstop=4 shiftwidth=4 softtabstop=4 foldmethod=marker: */
+// +---------------------------------------------------------------------------
+// | SWAN [ $_SWANBR_SLOGAN_$ ]
+// +---------------------------------------------------------------------------
+// | Copyright $_SWANBR_COPYRIGHT_$
+// +---------------------------------------------------------------------------
+// | Version $_SWANBR_VERSION_$
+// +---------------------------------------------------------------------------
+// | Licensed ( $_SWANBR_LICENSED_URL_$ )
+// +---------------------------------------------------------------------------
+// | $_SWANBR_WEB_DOMAIN_$
+// +---------------------------------------------------------------------------
+
+namespace Kafka\Protocol\Fetch\Helper;
+
+/**
++------------------------------------------------------------------------------
+* Kafka protocol since Kafka v0.8
++------------------------------------------------------------------------------
+*
+* @package
+* @version $_SWANBR_VERSION_$
+* @copyright Copyleft
+* @author $_SWANBR_AUTHOR_$
++------------------------------------------------------------------------------
+*/
+
+class FreeStream extends HelperAbstract
+{
+ // {{{ members
+
+ /**
+ * streams
+ *
+ * @var array
+ * @access protected
+ */
+ protected $streams = array();
+
+ // }}}
+ // {{{ functions
+ // {{{ public function __construct()
+
+ /**
+ * __construct
+ *
+ * @access public
+ * @return void
+ */
+ public function __construct($client)
+ {
+ $this->client = $client;
+ }
+
+ // }}}
+ // {{{ public function setStreams()
+
+ /**
+ * set streams
+ *
+ * @access public
+ * @return void
+ */
+ public function setStreams($streams)
+ {
+ $this->streams = $streams;
+ }
+
+ // }}}
+ // {{{ public function onStreamEof()
+
+ /**
+ * on stream eof call
+ *
+ * @param string $streamKey
+ * @access public
+ * @return void
+ */
+ public function onStreamEof($streamKey)
+ {
+ if (isset($this->streams[$streamKey])) {
+ $this->client->freeStream($streamKey);
+ }
+ }
+
+ // }}}
+ // {{{ public function onTopicEof()
+
+ /**
+ * on topic eof call
+ *
+ * @param string $topicName
+ * @access public
+ * @return void
+ */
+ public function onTopicEof($topicName)
+ {
+ }
+
+ // }}}
+ // {{{ public function onPartitionEof()
+
+ /**
+ * on partition eof call
+ *
+ * @param \Kafka\Protocol\Fetch\Partition $partition
+ * @access public
+ * @return void
+ */
+ public function onPartitionEof($partition)
+ {
+ }
+
+ // }}}
+ // }}}
+}
diff --git a/vendor/nmred/kafka-php/src/Kafka/Protocol/Fetch/Helper/Helper.php b/vendor/nmred/kafka-php/src/Kafka/Protocol/Fetch/Helper/Helper.php
new file mode 100644
index 00000000..4ec23926
--- /dev/null
+++ b/vendor/nmred/kafka-php/src/Kafka/Protocol/Fetch/Helper/Helper.php
@@ -0,0 +1,160 @@
+<?php
+/* vim: set expandtab tabstop=4 shiftwidth=4 softtabstop=4 foldmethod=marker: */
+// +---------------------------------------------------------------------------
+// | SWAN [ $_SWANBR_SLOGAN_$ ]
+// +---------------------------------------------------------------------------
+// | Copyright $_SWANBR_COPYRIGHT_$
+// +---------------------------------------------------------------------------
+// | Version $_SWANBR_VERSION_$
+// +---------------------------------------------------------------------------
+// | Licensed ( $_SWANBR_LICENSED_URL_$ )
+// +---------------------------------------------------------------------------
+// | $_SWANBR_WEB_DOMAIN_$
+// +---------------------------------------------------------------------------
+
+namespace Kafka\Protocol\Fetch\Helper;
+
+/**
++------------------------------------------------------------------------------
+* Kafka protocol since Kafka v0.8
++------------------------------------------------------------------------------
+*
+* @package
+* @version $_SWANBR_VERSION_$
+* @copyright Copyleft
+* @author $_SWANBR_AUTHOR_$
++------------------------------------------------------------------------------
+*/
+
+class Helper
+{
+ // {{{ members
+
+ /**
+ * helper object
+ */
+ private static $helpers = array();
+
+ // }}}
+ // {{{ functions
+ // {{{ public staitc function registerHelper()
+
+ /**
+ * register helper
+ *
+ * @param string $key
+ * @param \Kafka\Protocol\Fetch\Helper\HelperAbstract $helper
+ * @static
+ * @access public
+ * @return void
+ */
+ public static function registerHelper($key, $helper = null)
+ {
+ if (is_null($helper)) {
+ $className = '\\Kafka\\Protocol\\Fetch\\Helper\\' . $key;
+ if (!class_exists($className)) {
+ throw new \Kafka\Exception('helper is not exists.');
+ }
+ $helper = new $className();
+ }
+
+ if ($helper instanceof \Kafka\Protocol\Fetch\Helper\HelperAbstract) {
+ self::$helpers[$key] = $helper;
+ } else {
+ throw new \Kafka\Exception('this helper not instance of `\Kafka\Protocol\Fetch\Helper\HelperAbstract`');
+ }
+ }
+
+ // }}}
+ // {{{ public staitc function unRegisterHelper()
+
+ /**
+ * unregister helper
+ *
+ * @param string $key
+ * @static
+ * @access public
+ * @return void
+ */
+ public static function unRegisterHelper($key)
+ {
+ if (isset(self::$helpers[$key])) {
+ unset(self::$helpers[$key]);
+ }
+ }
+
+ // }}}
+ // {{{ public static function onStreamEof()
+
+ /**
+ * on stream eof call
+ *
+ * @param string $streamKey
+ * @static
+ * @access public
+ * @return void
+ */
+ public static function onStreamEof($streamKey)
+ {
+ if (empty(self::$helpers)) {
+ return false;
+ }
+
+ foreach (self::$helpers as $key => $helper) {
+ if (method_exists($helper, 'onStreamEof')) {
+ $helper->onStreamEof($streamKey);
+ }
+ }
+ }
+
+ // }}}
+ // {{{ public static function onTopicEof()
+
+ /**
+ * on topic eof call
+ *
+ * @param string $topicName
+ * @static
+ * @access public
+ * @return void
+ */
+ public static function onTopicEof($topicName)
+ {
+ if (empty(self::$helpers)) {
+ return false;
+ }
+
+ foreach (self::$helpers as $key => $helper) {
+ if (method_exists($helper, 'onTopicEof')) {
+ $helper->onStreamEof($topicName);
+ }
+ }
+ }
+
+ // }}}
+ // {{{ public static function onPartitionEof()
+
+ /**
+ * on partition eof call
+ *
+ * @param \Kafka\Protocol\Fetch\Partition $partition
+ * @static
+ * @access public
+ * @return void
+ */
+ public static function onPartitionEof($partition)
+ {
+ if (empty(self::$helpers)) {
+ return false;
+ }
+
+ foreach (self::$helpers as $key => $helper) {
+ if (method_exists($helper, 'onPartitionEof')) {
+ $helper->onPartitionEof($partition);
+ }
+ }
+ }
+
+ // }}}
+ // }}}
+}
diff --git a/vendor/nmred/kafka-php/src/Kafka/Protocol/Fetch/Helper/HelperAbstract.php b/vendor/nmred/kafka-php/src/Kafka/Protocol/Fetch/Helper/HelperAbstract.php
new file mode 100644
index 00000000..476f3da1
--- /dev/null
+++ b/vendor/nmred/kafka-php/src/Kafka/Protocol/Fetch/Helper/HelperAbstract.php
@@ -0,0 +1,71 @@
+<?php
+/* vim: set expandtab tabstop=4 shiftwidth=4 softtabstop=4 foldmethod=marker: */
+// +---------------------------------------------------------------------------
+// | SWAN [ $_SWANBR_SLOGAN_$ ]
+// +---------------------------------------------------------------------------
+// | Copyright $_SWANBR_COPYRIGHT_$
+// +---------------------------------------------------------------------------
+// | Version $_SWANBR_VERSION_$
+// +---------------------------------------------------------------------------
+// | Licensed ( $_SWANBR_LICENSED_URL_$ )
+// +---------------------------------------------------------------------------
+// | $_SWANBR_WEB_DOMAIN_$
+// +---------------------------------------------------------------------------
+
+namespace Kafka\Protocol\Fetch\Helper;
+
+/**
++------------------------------------------------------------------------------
+* Kafka protocol since Kafka v0.8
++------------------------------------------------------------------------------
+*
+* @package
+* @version $_SWANBR_VERSION_$
+* @copyright Copyleft
+* @author $_SWANBR_AUTHOR_$
++------------------------------------------------------------------------------
+*/
+
+abstract class HelperAbstract
+{
+ // {{{ members
+ // }}}
+ // {{{ functions
+ // {{{ abstract public function onStreamEof()
+
+ /**
+ * on stream eof
+ *
+ * @param string $streamKey
+ * @access public
+ * @return void
+ */
+ abstract public function onStreamEof($streamKey);
+
+ // }}}
+ // {{{ abstract public function onTopicEof()
+
+ /**
+ * on topic eof
+ *
+ * @param string $topicName
+ * @access public
+ * @return void
+ */
+ abstract public function onTopicEof($topicName);
+
+ // }}}
+ // {{{ abstract public function onPartitionEof()
+
+ /**
+ * on partition eof
+ *
+ * @param \Kafka\Protocol\Fetch\Partition $partition
+ * @access public
+ * @return void
+ */
+ abstract public function onPartitionEof($partition);
+
+ // }}}
+ // }}}
+}
diff --git a/vendor/nmred/kafka-php/src/Kafka/Protocol/Fetch/Message.php b/vendor/nmred/kafka-php/src/Kafka/Protocol/Fetch/Message.php
new file mode 100644
index 00000000..42d7da1d
--- /dev/null
+++ b/vendor/nmred/kafka-php/src/Kafka/Protocol/Fetch/Message.php
@@ -0,0 +1,175 @@
+<?php
+/* vim: set expandtab tabstop=4 shiftwidth=4 softtabstop=4 foldmethod=marker: */
+// +---------------------------------------------------------------------------
+// | SWAN [ $_SWANBR_SLOGAN_$ ]
+// +---------------------------------------------------------------------------
+// | Copyright $_SWANBR_COPYRIGHT_$
+// +---------------------------------------------------------------------------
+// | Version $_SWANBR_VERSION_$
+// +---------------------------------------------------------------------------
+// | Licensed ( $_SWANBR_LICENSED_URL_$ )
+// +---------------------------------------------------------------------------
+// | $_SWANBR_WEB_DOMAIN_$
+// +---------------------------------------------------------------------------
+
+namespace Kafka\Protocol\Fetch;
+
+use \Kafka\Protocol\Decoder;
+
+/**
++------------------------------------------------------------------------------
+* Kafka protocol since Kafka v0.8
++------------------------------------------------------------------------------
+*
+* @package
+* @version $_SWANBR_VERSION_$
+* @copyright Copyleft
+* @author $_SWANBR_AUTHOR_$
++------------------------------------------------------------------------------
+*/
+
+class Message
+{
+ // {{{ members
+
+ /**
+ * init read bytes
+ *
+ * @var float
+ * @access private
+ */
+ private $initOffset = 0;
+
+ /**
+ * validByteCount
+ *
+ * @var float
+ * @access private
+ */
+ private $validByteCount = 0;
+
+ /**
+ * crc32 code
+ *
+ * @var float
+ * @access private
+ */
+ private $crc = 0;
+
+ /**
+ * This is a version id used to allow backwards compatible evolution of the
+ * message binary format.
+ *
+ * @var float
+ * @access private
+ */
+ private $magic = 0;
+
+ /**
+ * The lowest 2 bits contain the compression codec used for the message. The
+ * other bits should be set to 0.
+ *
+ * @var float
+ * @access private
+ */
+ private $attribute = 0;
+
+ /**
+ * message key
+ *
+ * @var string
+ * @access private
+ */
+ private $key = '';
+
+ /**
+ * message value
+ *
+ * @var string
+ * @access private
+ */
+ private $value = '';
+
+ // }}}
+ // {{{ functions
+ // {{{ public function __construct()
+
+ /**
+ * __construct
+ *
+ * @param string(raw) $msg
+ * @access public
+ * @return void
+ */
+ public function __construct($msg)
+ {
+ $offset = 0;
+ $crc = Decoder::unpack(Decoder::BIT_B32, substr($msg, $offset, 4));
+ $offset += 4;
+ $this->crc = array_shift($crc);
+ $magic = Decoder::unpack(Decoder::BIT_B8, substr($msg, $offset, 1));
+ $this->magic = array_shift($magic);
+ $offset += 1;
+ $attr = Decoder::unpack(Decoder::BIT_B8, substr($msg, $offset, 1));
+ $this->attribute = array_shift($attr);
+ $offset += 1;
+ $keyLen = Decoder::unpack(Decoder::BIT_B32, substr($msg, $offset, 4));
+ $keyLen = array_shift($keyLen);
+ $offset += 4;
+ if ($keyLen > 0 && $keyLen != 0xFFFFFFFF) {
+ $this->key = substr($msg, $offset, $keyLen);
+ $offset += $keyLen;
+ }
+ $messageSize = Decoder::unpack(Decoder::BIT_B32, substr($msg, $offset, 4));
+ $messageSize = array_shift($messageSize);
+ $offset += 4;
+ if ($messageSize) {
+ $this->value = substr($msg, $offset, $messageSize);
+ }
+ }
+
+ // }}}
+ // {{{ public function getMessage()
+
+ /**
+ * get message data
+ *
+ * @access public
+ * @return string (raw)
+ */
+ public function getMessage()
+ {
+ return $this->value;
+ }
+
+ // }}}
+ // {{{ public function getMessageKey()
+
+ /**
+ * get message key
+ *
+ * @access public
+ * @return string (raw)
+ */
+ public function getMessageKey()
+ {
+ return $this->key;
+ }
+
+ // }}}
+ // {{{ public function __toString()
+
+ /**
+ * __toString
+ *
+ * @access public
+ * @return void
+ */
+ public function __toString()
+ {
+ return $this->value;
+ }
+
+ // }}}
+ // }}}
+}
diff --git a/vendor/nmred/kafka-php/src/Kafka/Protocol/Fetch/MessageSet.php b/vendor/nmred/kafka-php/src/Kafka/Protocol/Fetch/MessageSet.php
new file mode 100644
index 00000000..50413b67
--- /dev/null
+++ b/vendor/nmred/kafka-php/src/Kafka/Protocol/Fetch/MessageSet.php
@@ -0,0 +1,269 @@
+<?php
+/* vim: set expandtab tabstop=4 shiftwidth=4 softtabstop=4 foldmethod=marker: */
+// +---------------------------------------------------------------------------
+// | SWAN [ $_SWANBR_SLOGAN_$ ]
+// +---------------------------------------------------------------------------
+// | Copyright $_SWANBR_COPYRIGHT_$
+// +---------------------------------------------------------------------------
+// | Version $_SWANBR_VERSION_$
+// +---------------------------------------------------------------------------
+// | Licensed ( $_SWANBR_LICENSED_URL_$ )
+// +---------------------------------------------------------------------------
+// | $_SWANBR_WEB_DOMAIN_$
+// +---------------------------------------------------------------------------
+
+namespace Kafka\Protocol\Fetch;
+
+use \Kafka\Protocol\Decoder;
+
+/**
++------------------------------------------------------------------------------
+* Kafka protocol since Kafka v0.8
++------------------------------------------------------------------------------
+*
+* @package
+* @version $_SWANBR_VERSION_$
+* @copyright Copyleft
+* @author $_SWANBR_AUTHOR_$
++------------------------------------------------------------------------------
+*/
+
+class MessageSet implements \Iterator
+{
+ // {{{ members
+
+ /**
+ * kafka socket object
+ *
+ * @var mixed
+ * @access private
+ */
+ private $stream = null;
+
+ /**
+ * messageSet size
+ *
+ * @var float
+ * @access private
+ */
+ private $messageSetSize = 0;
+
+ /**
+ * validByteCount
+ *
+ * @var float
+ * @access private
+ */
+ private $validByteCount = 0;
+
+ /**
+ * messageSet offset
+ *
+ * @var float
+ * @access private
+ */
+ private $offset = 0;
+
+ /**
+ * valid
+ *
+ * @var mixed
+ * @access private
+ */
+ private $valid = false;
+
+ /**
+ * partition object
+ *
+ * @var \Kafka\Protocol\Fetch\Partition
+ * @access private
+ */
+ private $partition = null;
+
+ /**
+ * request fetch context
+ *
+ * @var array
+ */
+ private $context = array();
+
+ // }}}
+ // {{{ functions
+ // {{{ public function __construct()
+
+ /**
+ * __construct
+ *
+ * @param \Kafka\Socket $stream
+ * @param int $initOffset
+ * @access public
+ * @return void
+ */
+ public function __construct(\Kafka\Protocol\Fetch\Partition $partition, $context = array())
+ {
+ $this->stream = $partition->getStream();
+ $this->partition = $partition;
+ $this->context = $context;
+ $this->messageSetSize = $this->getMessageSetSize();
+ \Kafka\Log::log("messageSetSize: {$this->messageSetSize}", LOG_INFO);
+ }
+
+ // }}}
+ // {{{ public function current()
+
+ /**
+ * current
+ *
+ * @access public
+ * @return void
+ */
+ public function current()
+ {
+ return $this->current;
+ }
+
+ // }}}
+ // {{{ public function key()
+
+ /**
+ * key
+ *
+ * @access public
+ * @return void
+ */
+ public function key()
+ {
+ return $this->validByteCount;
+ }
+
+ // }}}
+ // {{{ public function rewind()
+
+ /**
+ * implements Iterator function
+ *
+ * @access public
+ * @return integer
+ */
+ public function rewind()
+ {
+ $this->valid = $this->loadNextMessage();
+ }
+
+ // }}}
+ // {{{ public function valid()
+
+ /**
+ * implements Iterator function
+ *
+ * @access public
+ * @return integer
+ */
+ public function valid()
+ {
+ if (!$this->valid) {
+ $this->partition->setMessageOffset($this->offset);
+
+ // one partition iterator end
+ \Kafka\Protocol\Fetch\Helper\Helper::onPartitionEof($this->partition);
+ }
+
+ return $this->valid;
+ }
+
+ // }}}
+ // {{{ public function next()
+
+ /**
+ * implements Iterator function
+ *
+ * @access public
+ * @return integer
+ */
+ public function next()
+ {
+ $this->valid = $this->loadNextMessage();
+ }
+
+ // }}}
+ // {{{ protected function getMessageSetSize()
+
+ /**
+ * get message set size
+ *
+ * @access protected
+ * @return integer
+ */
+ protected function getMessageSetSize()
+ {
+ // read message size
+ $data = $this->stream->read(4, true);
+ $data = Decoder::unpack(Decoder::BIT_B32, $data);
+ $size = array_shift($data);
+ if ($size <= 0) {
+ throw new \Kafka\Exception\OutOfRange($size . ' is not a valid message size');
+ }
+
+ return $size;
+ }
+
+ // }}}
+ // {{{ public function loadNextMessage()
+
+ /**
+ * load next message
+ *
+ * @access public
+ * @return void
+ */
+ public function loadNextMessage()
+ {
+ if ($this->validByteCount >= $this->messageSetSize) {
+ return false;
+ }
+
+ try {
+ if ($this->validByteCount + 12 > $this->messageSetSize) {
+ // read socket buffer dirty data
+ $this->stream->read($this->messageSetSize - $this->validByteCount);
+ return false;
+ }
+ $offset = $this->stream->read(8, true);
+ $this->offset = \Kafka\Protocol\Decoder::unpack(Decoder::BIT_B64, $offset);
+ $messageSize = $this->stream->read(4, true);
+ $messageSize = Decoder::unpack(Decoder::BIT_B32, $messageSize);
+ $messageSize = array_shift($messageSize);
+ $this->validByteCount += 12;
+ if (($this->validByteCount + $messageSize) > $this->messageSetSize) {
+ // read socket buffer dirty data
+ $this->stream->read($this->messageSetSize - $this->validByteCount);
+ return false;
+ }
+ $msg = $this->stream->read($messageSize, true);
+ $this->current = new Message($msg);
+ } catch (\Kafka\Exception $e) {
+ \Kafka\Log::log("already fetch: {$this->validByteCount}, {$e->getMessage()}", LOG_INFO);
+ return false;
+ }
+
+ $this->validByteCount += $messageSize;
+
+ return true;
+ }
+
+ // }}}
+ // {{{ public function messageOffset()
+
+ /**
+ * current message offset in producer
+ *
+ * @return void
+ */
+ public function messageOffset()
+ {
+ return $this->offset;
+ }
+
+ // }}}
+ // }}}
+}
diff --git a/vendor/nmred/kafka-php/src/Kafka/Protocol/Fetch/Partition.php b/vendor/nmred/kafka-php/src/Kafka/Protocol/Fetch/Partition.php
new file mode 100644
index 00000000..9f8578d5
--- /dev/null
+++ b/vendor/nmred/kafka-php/src/Kafka/Protocol/Fetch/Partition.php
@@ -0,0 +1,375 @@
+<?php
+/* vim: set expandtab tabstop=4 shiftwidth=4 softtabstop=4 foldmethod=marker: */
+// +---------------------------------------------------------------------------
+// | SWAN [ $_SWANBR_SLOGAN_$ ]
+// +---------------------------------------------------------------------------
+// | Copyright $_SWANBR_COPYRIGHT_$
+// +---------------------------------------------------------------------------
+// | Version $_SWANBR_VERSION_$
+// +---------------------------------------------------------------------------
+// | Licensed ( $_SWANBR_LICENSED_URL_$ )
+// +---------------------------------------------------------------------------
+// | $_SWANBR_WEB_DOMAIN_$
+// +---------------------------------------------------------------------------
+
+namespace Kafka\Protocol\Fetch;
+
+use \Kafka\Protocol\Decoder;
+
+/**
++------------------------------------------------------------------------------
+* Kafka protocol since Kafka v0.8
++------------------------------------------------------------------------------
+*
+* @package
+* @version $_SWANBR_VERSION_$
+* @copyright Copyleft
+* @author $_SWANBR_AUTHOR_$
++------------------------------------------------------------------------------
+*/
+
+class Partition implements \Iterator, \Countable
+{
+ // {{{ members
+
+ /**
+ * kafka socket object
+ *
+ * @var mixed
+ * @access private
+ */
+ private $stream = null;
+
+ /**
+ * validCount
+ *
+ * @var float
+ * @access private
+ */
+ private $validCount = 0;
+
+ /**
+ * partitions count
+ *
+ * @var float
+ * @access private
+ */
+ private $partitionCount = false;
+
+ /**
+ * current topic
+ *
+ * @var mixed
+ * @access private
+ */
+ private $current = null;
+
+ /**
+ * current iterator key
+ * partition id
+ *
+ * @var string
+ * @access private
+ */
+ private $key = null;
+
+ /**
+ * partition errCode
+ *
+ * @var float
+ * @access private
+ */
+ private $errCode = 0;
+
+ /**
+ * partition offset
+ *
+ * @var float
+ * @access private
+ */
+ private $offset = 0;
+
+ /**
+ * partition current fetch offset
+ *
+ * @var float
+ * @access private
+ */
+ private $currentOffset = 0;
+
+ /**
+ * valid
+ *
+ * @var mixed
+ * @access private
+ */
+ private $valid = false;
+
+ /**
+ * cuerrent topic name
+ *
+ * @var string
+ * @access private
+ */
+ private $topicName = '';
+
+ /**
+ * request fetch context
+ *
+ * @var array
+ */
+ private $context = array();
+
+ // }}}
+ // {{{ functions
+ // {{{ public function __construct()
+
+ /**
+ * __construct
+ *
+ * @param \Kafka\Protocol\Fetch\Topic $topic
+ * @param int $initOffset
+ * @access public
+ * @return void
+ */
+ public function __construct(\Kafka\Protocol\Fetch\Topic $topic, $context = array())
+ {
+ $this->stream = $topic->getStream();
+ $this->topicName = $topic->key();
+ $this->context = $context;
+ $this->partitionCount = $this->getPartitionCount();
+ }
+
+ // }}}
+ // {{{ public function current()
+
+ /**
+ * current
+ *
+ * @access public
+ * @return void
+ */
+ public function current()
+ {
+ return $this->current;
+ }
+
+ // }}}
+ // {{{ public function key()
+
+ /**
+ * key
+ *
+ * @access public
+ * @return void
+ */
+ public function key()
+ {
+ return $this->key;
+ }
+
+ // }}}
+ // {{{ public function rewind()
+
+ /**
+ * implements Iterator function
+ *
+ * @access public
+ * @return integer
+ */
+ public function rewind()
+ {
+ $this->valid = $this->loadNextPartition();
+ }
+
+ // }}}
+ // {{{ public function valid()
+
+ /**
+ * implements Iterator function
+ *
+ * @access public
+ * @return integer
+ */
+ public function valid()
+ {
+ return $this->valid && $this->validCount <= $this->partitionCount;
+ }
+
+ // }}}
+ // {{{ public function next()
+
+ /**
+ * implements Iterator function
+ *
+ * @access public
+ * @return integer
+ */
+ public function next()
+ {
+ $this->valid = $this->loadNextPartition();
+ }
+
+ // }}}
+ // {{{ public function count()
+
+ /**
+ * implements Countable function
+ *
+ * @access public
+ * @return integer
+ */
+ public function count()
+ {
+ return $this->partitionCount;
+ }
+
+ // }}}
+ // {{{ public function getErrCode()
+
+ /**
+ * get partition errcode
+ *
+ * @access public
+ * @return void
+ */
+ public function getErrCode()
+ {
+ return $this->errCode;
+ }
+
+ // }}}
+ // {{{ public function getHighOffset()
+
+ /**
+ * get partition high offset
+ *
+ * @access public
+ * @return void
+ */
+ public function getHighOffset()
+ {
+ return $this->offset;
+ }
+
+ // }}}
+ // {{{ public function getTopicName()
+
+ /**
+ * get partition topic name
+ *
+ * @access public
+ * @return void
+ */
+ public function getTopicName()
+ {
+ return $this->topicName;
+ }
+
+ // }}}
+ // {{{ public function getStream()
+
+ /**
+ * get current stream
+ *
+ * @access public
+ * @return \Kafka\Socket
+ */
+ public function getStream()
+ {
+ return $this->stream;
+ }
+
+ // }}}
+ // {{{ protected function getPartitionCount()
+
+ /**
+ * get message size
+ * only use to object init
+ *
+ * @access protected
+ * @return integer
+ */
+ protected function getPartitionCount()
+ {
+ // read topic count
+ $data = $this->stream->read(4, true);
+ $data = Decoder::unpack(Decoder::BIT_B32, $data);
+ $count = array_shift($data);
+ if ($count <= 0) {
+ throw new \Kafka\Exception\OutOfRange($size . ' is not a valid partition count');
+ }
+
+ return $count;
+ }
+
+ // }}}
+ // {{{ public function loadNextPartition()
+
+ /**
+ * load next partition
+ *
+ * @access public
+ * @return void
+ */
+ public function loadNextPartition()
+ {
+ if ($this->validCount >= $this->partitionCount) {
+ return false;
+ }
+
+ try {
+ $partitionId = $this->stream->read(4, true);
+ $partitionId = Decoder::unpack(Decoder::BIT_B32, $partitionId);
+ $partitionId = array_shift($partitionId);
+ \Kafka\Log::log("kafka client:fetch partition:" . $partitionId, LOG_INFO);
+
+ $errCode = $this->stream->read(2, true);
+ $errCode = Decoder::unpack(Decoder::BIT_B16, $errCode);
+ $this->errCode = array_shift($errCode);
+ if ($this->errCode != 0) {
+ throw new \Kafka\Exception(\Kafka\Protocol\Decoder::getError($this->errCode));
+ }
+ $offset = $this->stream->read(8, true);
+ $this->offset = \Kafka\Protocol\Decoder::unpack(Decoder::BIT_B64, $offset);
+
+ $this->key = $partitionId;
+ $this->current = new MessageSet($this, $this->context);
+ } catch (\Kafka\Exception $e) {
+ return false;
+ }
+
+ $this->validCount++;
+ return true;
+ }
+
+ // }}}
+ // {{{ public function setMessageOffset()
+
+ /**
+ * set messageSet fetch offset current
+ *
+ * @param intger $offset
+ * @return void
+ */
+ public function setMessageOffset($offset)
+ {
+ $this->currentOffset = $offset;
+ }
+
+ // }}}
+ // {{{ public function getMessageOffset()
+
+ /**
+ * get messageSet fetch offset current
+ *
+ * @return int
+ */
+ public function getMessageOffset()
+ {
+ return $this->currentOffset;
+ }
+
+ // }}}
+ // }}}
+}
diff --git a/vendor/nmred/kafka-php/src/Kafka/Protocol/Fetch/Topic.php b/vendor/nmred/kafka-php/src/Kafka/Protocol/Fetch/Topic.php
new file mode 100644
index 00000000..500e6b1f
--- /dev/null
+++ b/vendor/nmred/kafka-php/src/Kafka/Protocol/Fetch/Topic.php
@@ -0,0 +1,345 @@
+<?php
+/* vim: set expandtab tabstop=4 shiftwidth=4 softtabstop=4 foldmethod=marker: */
+// +---------------------------------------------------------------------------
+// | SWAN [ $_SWANBR_SLOGAN_$ ]
+// +---------------------------------------------------------------------------
+// | Copyright $_SWANBR_COPYRIGHT_$
+// +---------------------------------------------------------------------------
+// | Version $_SWANBR_VERSION_$
+// +---------------------------------------------------------------------------
+// | Licensed ( $_SWANBR_LICENSED_URL_$ )
+// +---------------------------------------------------------------------------
+// | $_SWANBR_WEB_DOMAIN_$
+// +---------------------------------------------------------------------------
+
+namespace Kafka\Protocol\Fetch;
+
+use \Kafka\Protocol\Decoder;
+
+/**
++------------------------------------------------------------------------------
+* Kafka protocol since Kafka v0.8
++------------------------------------------------------------------------------
+*
+* @package
+* @version $_SWANBR_VERSION_$
+* @copyright Copyleft
+* @author $_SWANBR_AUTHOR_$
++------------------------------------------------------------------------------
+*/
+
+class Topic implements \Iterator, \Countable
+{
+ // {{{ members
+
+ /**
+ * kafka socket object
+ *
+ * @var array
+ * @access private
+ */
+ private $streams = array();
+
+ /**
+ * each topic count
+ *
+ * @var array
+ * @access private
+ */
+ private $topicCounts = array();
+
+ /**
+ * current iterator stream
+ *
+ * @var mixed
+ * @access private
+ */
+ private $currentStreamKey = 0;
+
+ /**
+ * current lock key
+ *
+ * @var string
+ * @access private
+ */
+ private $currentStreamLockKey = '';
+
+ /**
+ * currentStreamCount
+ *
+ * @var float
+ * @access private
+ */
+ private $currentStreamCount = 0;
+
+ /**
+ * validCount
+ *
+ * @var float
+ * @access private
+ */
+ private $validCount = 0;
+
+ /**
+ * topic count
+ *
+ * @var float
+ * @access private
+ */
+ private $topicCount = false;
+
+ /**
+ * current topic
+ *
+ * @var mixed
+ * @access private
+ */
+ private $current = null;
+
+ /**
+ * current iterator key
+ * topic name
+ *
+ * @var string
+ * @access private
+ */
+ private $key = null;
+
+ /**
+ * valid
+ *
+ * @var mixed
+ * @access private
+ */
+ private $valid = false;
+
+ /**
+ * request fetch context
+ *
+ * @var array
+ */
+ private $context = array();
+
+ // }}}
+ // {{{ functions
+ // {{{ public function __construct()
+
+ /**
+ * __construct
+ *
+ * @param \Kafka\Socket $stream
+ * @param int $initOffset
+ * @access public
+ * @return void
+ */
+ public function __construct($streams, $context = array())
+ {
+ if (!is_array($streams)) {
+ $streams = array($streams);
+ }
+ $this->streams = $streams;
+ $topicInfos = array();
+ foreach ($context as $values) {
+ if (!isset($values['data'])) {
+ continue;
+ }
+
+ foreach ($values['data'] as $value) {
+ if (!isset($value['topic_name']) || !isset($value['partitions'])) {
+ continue;
+ }
+
+ $topicName = $value['topic_name'];
+ foreach ($value['partitions'] as $part) {
+ $topicInfos[$topicName][$part['partition_id']] = array(
+ 'offset' => $part['offset'],
+ );
+ }
+ }
+ }
+ $this->context = $topicInfos;
+ $this->topicCount = $this->getTopicCount();
+ }
+
+ // }}}
+ // {{{ public function current()
+
+ /**
+ * current
+ *
+ * @access public
+ * @return void
+ */
+ public function current()
+ {
+ return $this->current;
+ }
+
+ // }}}
+ // {{{ public function key()
+
+ /**
+ * key
+ *
+ * @access public
+ * @return void
+ */
+ public function key()
+ {
+ return $this->key;
+ }
+
+ // }}}
+ // {{{ public function rewind()
+
+ /**
+ * implements Iterator function
+ *
+ * @access public
+ * @return integer
+ */
+ public function rewind()
+ {
+ $this->valid = $this->loadNextTopic();
+ }
+
+ // }}}
+ // {{{ public function valid()
+
+ /**
+ * implements Iterator function
+ *
+ * @access public
+ * @return integer
+ */
+ public function valid()
+ {
+ return $this->valid;
+ }
+
+ // }}}
+ // {{{ public function next()
+
+ /**
+ * implements Iterator function
+ *
+ * @access public
+ * @return integer
+ */
+ public function next()
+ {
+ $this->valid = $this->loadNextTopic();
+ }
+
+ // }}}
+ // {{{ public function count()
+
+ /**
+ * implements Countable function
+ *
+ * @access public
+ * @return integer
+ */
+ public function count()
+ {
+ return $this->topicCount;
+ }
+
+ // }}}
+ // {{{ protected function getTopicCount()
+
+ /**
+ * get message size
+ * only use to object init
+ *
+ * @access protected
+ * @return integer
+ */
+ protected function getTopicCount()
+ {
+ $count = 0;
+ foreach (array_values($this->streams) as $key => $stream) {
+ // read topic count
+ $stream->read(8, true);
+ $data = $stream->read(4, true);
+ $data = Decoder::unpack(Decoder::BIT_B32, $data);
+ $topicCount = array_shift($data);
+ $count += $topicCount;
+ $this->topicCounts[$key] = $topicCount;
+ if ($count <= 0) {
+ throw new \Kafka\Exception\OutOfRange($count . ' is not a valid topic count');
+ }
+ }
+
+ return $count;
+ }
+
+ // }}}
+ // {{{ public function loadNextTopic()
+
+ /**
+ * load next topic
+ *
+ * @access public
+ * @return void
+ */
+ public function loadNextTopic()
+ {
+ if ($this->validCount >= $this->topicCount) {
+ \Kafka\Protocol\Fetch\Helper\Helper::onStreamEof($this->currentStreamLockKey);
+ return false;
+ }
+
+ if ($this->currentStreamCount >= $this->topicCounts[$this->currentStreamKey]) {
+ \Kafka\Protocol\Fetch\Helper\Helper::onStreamEof($this->currentStreamLockKey);
+ $this->currentStreamKey++;
+ }
+
+ $lockKeys = array_keys($this->streams);
+ $streams = array_values($this->streams);
+ if (!isset($streams[$this->currentStreamKey])) {
+ return false;
+ }
+
+ $stream = $streams[$this->currentStreamKey];
+ $this->currentStreamLockKey = $lockKeys[$this->currentStreamKey];
+
+ try {
+ $topicLen = $stream->read(2, true);
+ $topicLen = Decoder::unpack(Decoder::BIT_B16, $topicLen);
+ $topicLen = array_shift($topicLen);
+ if ($topicLen <= 0) {
+ return false;
+ }
+
+ // topic name
+ $this->key = $stream->read($topicLen, true);
+ $this->current = new Partition($this, $this->context);
+ } catch (\Kafka\Exception $e) {
+ return false;
+ }
+
+ $this->validCount++;
+ $this->currentStreamCount++;
+
+ return true;
+ }
+
+ // }}}
+ // {{{ public function getStream()
+
+ /**
+ * get current stream
+ *
+ * @access public
+ * @return \Kafka\Socket
+ */
+ public function getStream()
+ {
+ $streams = array_values($this->streams);
+ return $streams[$this->currentStreamKey];
+ }
+
+ // }}}
+ // }}}
+}
diff --git a/vendor/nmred/kafka-php/src/Kafka/Protocol/Protocol.php b/vendor/nmred/kafka-php/src/Kafka/Protocol/Protocol.php
new file mode 100644
index 00000000..a31067b5
--- /dev/null
+++ b/vendor/nmred/kafka-php/src/Kafka/Protocol/Protocol.php
@@ -0,0 +1,230 @@
+<?php
+/* vim: set expandtab tabstop=4 shiftwidth=4 softtabstop=4 foldmethod=marker: */
+// +---------------------------------------------------------------------------
+// | SWAN [ $_SWANBR_SLOGAN_$ ]
+// +---------------------------------------------------------------------------
+// | Copyright $_SWANBR_COPYRIGHT_$
+// +---------------------------------------------------------------------------
+// | Version $_SWANBR_VERSION_$
+// +---------------------------------------------------------------------------
+// | Licensed ( $_SWANBR_LICENSED_URL_$ )
+// +---------------------------------------------------------------------------
+// | $_SWANBR_WEB_DOMAIN_$
+// +---------------------------------------------------------------------------
+
+namespace Kafka\Protocol;
+
+/**
++------------------------------------------------------------------------------
+* Kafka protocol since Kafka v0.8
++------------------------------------------------------------------------------
+*
+* @package
+* @version $_SWANBR_VERSION_$
+* @copyright Copyleft
+* @author $_SWANBR_AUTHOR_$
++------------------------------------------------------------------------------
+*/
+
+abstract class Protocol
+{
+ // {{{ consts
+
+ /**
+ * Kafka server protocol version
+ */
+ const API_VERSION = 0;
+
+ /**
+ * use encode message, This is a version id used to allow backwards
+ * compatible evolution of the message binary format.
+ */
+ const MESSAGE_MAGIC = 0;
+
+ /**
+ * message no compression
+ */
+ const COMPRESSION_NONE = 0;
+
+ /**
+ * Message using gzip compression
+ */
+ const COMPRESSION_GZIP = 1;
+
+ /**
+ * Message using Snappy compression
+ */
+ const COMPRESSION_SNAPPY = 2;
+
+ /**
+ * pack int32 type
+ */
+ const PACK_INT32 = 0;
+
+ /**
+ * pack int16 type
+ */
+ const PACK_INT16 = 1;
+
+ /**
+ * protocol request code
+ */
+ const PRODUCE_REQUEST = 0;
+ const FETCH_REQUEST = 1;
+ const OFFSET_REQUEST = 2;
+ const METADATA_REQUEST = 3;
+ const OFFSET_COMMIT_REQUEST = 8;
+ const OFFSET_FETCH_REQUEST = 9;
+ const CONSUMER_METADATA_REQUEST = 10;
+
+ // unpack/pack bit
+ const BIT_B64 = 'N2';
+ const BIT_B32 = 'N';
+ const BIT_B16 = 'n';
+ const BIT_B8 = 'C';
+
+ // }}}
+ // {{{ members
+
+ /**
+ * stream
+ *
+ * @var mixed
+ * @access protected
+ */
+ protected $stream = null;
+
+ // }}}
+ // {{{ functions
+ // {{{ public function __construct()
+
+ /**
+ * __construct
+ *
+ * @param \Kafka\Socket $stream
+ * @access public
+ * @return void
+ */
+ public function __construct(\Kafka\Socket $stream)
+ {
+ $this->stream = $stream;
+ }
+
+ // }}}
+ // {{{ public static function Khex2bin()
+
+ /**
+ * hex to bin
+ *
+ * @param string $string
+ * @static
+ * @access protected
+ * @return string (raw)
+ */
+ public static function Khex2bin($string)
+ {
+ if (function_exists('\hex2bin')) {
+ return \hex2bin($string);
+ } else {
+ $bin = '';
+ $len = strlen($string);
+ for ($i = 0; $i < $len; $i += 2) {
+ $bin .= pack('H*', substr($string, $i, 2));
+ }
+
+ return $bin;
+ }
+ }
+
+ // }}}
+ // {{{ public static function unpack()
+
+ /**
+ * Unpack a bit integer as big endian long
+ *
+ * @static
+ * @access public
+ * @return integer
+ */
+ public static function unpack($type, $bytes)
+ {
+ self::checkLen($type, $bytes);
+ if ($type == self::BIT_B64) {
+ $set = unpack($type, $bytes);
+ $original = ($set[1] & 0xFFFFFFFF) << 32 | ($set[2] & 0xFFFFFFFF);
+ return $original;
+ } else {
+ return unpack($type, $bytes);
+ }
+ }
+
+ // }}}
+ // {{{ public static function pack()
+
+ /**
+ * pack a bit integer as big endian long
+ *
+ * @static
+ * @access public
+ * @return integer
+ */
+ public static function pack($type, $data)
+ {
+ if ($type == self::BIT_B64) {
+ if ($data == -1) { // -1L
+ $data = self::Khex2bin('ffffffffffffffff');
+ } elseif ($data == -2) { // -2L
+ $data = self::Khex2bin('fffffffffffffffe');
+ } else {
+ $left = 0xffffffff00000000;
+ $right = 0x00000000ffffffff;
+
+ $l = ($data & $left) >> 32;
+ $r = $data & $right;
+ $data = pack($type, $l, $r);
+ }
+ } else {
+ $data = pack($type, $data);
+ }
+
+ return $data;
+ }
+
+ // }}}
+ // {{{ protected static function checkLen()
+
+ /**
+ * check unpack bit is valid
+ *
+ * @param string $type
+ * @param string(raw) $bytes
+ * @static
+ * @access protected
+ * @return void
+ */
+ protected static function checkLen($type, $bytes)
+ {
+ $len = 0;
+ switch($type) {
+ case self::BIT_B64:
+ $len = 8;
+ break;
+ case self::BIT_B32:
+ $len = 4;
+ break;
+ case self::BIT_B16:
+ $len = 2;
+ break;
+ case self::BIT_B8:
+ $len = 1;
+ break;
+ }
+
+ if (strlen($bytes) != $len) {
+ throw new \Kafka\Exception\Protocol('unpack failed. string(raw) length is ' . strlen($bytes) . ' , TO ' . $type);
+ }
+ }
+
+ // }}}
+ // }}}
+}
diff --git a/vendor/nmred/kafka-php/src/Kafka/Socket.php b/vendor/nmred/kafka-php/src/Kafka/Socket.php
new file mode 100644
index 00000000..be7321f3
--- /dev/null
+++ b/vendor/nmred/kafka-php/src/Kafka/Socket.php
@@ -0,0 +1,365 @@
+<?php
+/* vim: set expandtab tabstop=4 shiftwidth=4 softtabstop=4 foldmethod=marker: */
+// +---------------------------------------------------------------------------
+// | SWAN [ $_SWANBR_SLOGAN_$ ]
+// +---------------------------------------------------------------------------
+// | Copyright $_SWANBR_COPYRIGHT_$
+// +---------------------------------------------------------------------------
+// | Version $_SWANBR_VERSION_$
+// +---------------------------------------------------------------------------
+// | Licensed ( $_SWANBR_LICENSED_URL_$ )
+// +---------------------------------------------------------------------------
+// | $_SWANBR_WEB_DOMAIN_$
+// +---------------------------------------------------------------------------
+
+namespace Kafka;
+
+/**
++------------------------------------------------------------------------------
+* Kafka protocol since Kafka v0.8
++------------------------------------------------------------------------------
+*
+* @package
+* @version $_SWANBR_VERSION_$
+* @copyright Copyleft
+* @author $_SWANBR_AUTHOR_$
++------------------------------------------------------------------------------
+*/
+
+class Socket
+{
+ // {{{ consts
+
+ const READ_MAX_LEN = 5242880; // read socket max length 5MB
+
+ // }}}
+ // {{{ members
+
+ /**
+ * Send timeout in seconds.
+ *
+ * @var float
+ * @access private
+ */
+ private $sendTimeoutSec = 0;
+
+ /**
+ * Send timeout in microseconds.
+ *
+ * @var float
+ * @access private
+ */
+ private $sendTimeoutUsec = 100000;
+
+ /**
+ * Recv timeout in seconds
+ *
+ * @var float
+ * @access private
+ */
+ private $recvTimeoutSec = 0;
+
+ /**
+ * Recv timeout in microseconds
+ *
+ * @var float
+ * @access private
+ */
+ private $recvTimeoutUsec = 750000;
+
+ /**
+ * Stream resource
+ *
+ * @var mixed
+ * @access private
+ */
+ private $stream = null;
+
+ /**
+ * Socket host
+ *
+ * @var mixed
+ * @access private
+ */
+ private $host = null;
+
+ /**
+ * Socket port
+ *
+ * @var mixed
+ * @access private
+ */
+ private $port = -1;
+
+ // }}}
+ // {{{ functions
+ // {{{ public function __construct()
+
+ /**
+ * __construct
+ *
+ * @access public
+ * @return void
+ */
+ public function __construct($host, $port, $recvTimeoutSec = 0, $recvTimeoutUsec = 750000, $sendTimeoutSec = 0, $sendTimeoutUsec = 100000)
+ {
+ $this->host = $host;
+ $this->port = $port;
+ $this->setRecvTimeoutSec($recvTimeoutSec);
+ $this->setRecvTimeoutUsec($recvTimeoutUsec);
+ $this->setSendTimeoutSec($sendTimeoutSec);
+ $this->setSendTimeoutUsec($sendTimeoutUsec);
+ }
+
+ /**
+ * @param float $sendTimeoutSec
+ */
+ public function setSendTimeoutSec($sendTimeoutSec)
+ {
+ $this->sendTimeoutSec = $sendTimeoutSec;
+ }
+
+ /**
+ * @param float $sendTimeoutUsec
+ */
+ public function setSendTimeoutUsec($sendTimeoutUsec)
+ {
+ $this->sendTimeoutUsec = $sendTimeoutUsec;
+ }
+
+ /**
+ * @param float $recvTimeoutSec
+ */
+ public function setRecvTimeoutSec($recvTimeoutSec)
+ {
+ $this->recvTimeoutSec = $recvTimeoutSec;
+ }
+
+ /**
+ * @param float $recvTimeoutUsec
+ */
+ public function setRecvTimeoutUsec($recvTimeoutUsec)
+ {
+ $this->recvTimeoutUsec = $recvTimeoutUsec;
+ }
+
+
+
+ // }}}
+ // {{{ public static function createFromStream()
+
+ /**
+ * Optional method to set the internal stream handle
+ *
+ * @static
+ * @access public
+ * @return void
+ */
+ public static function createFromStream($stream)
+ {
+ $socket = new self('localhost', 0);
+ $socket->setStream($stream);
+ return $socket;
+ }
+
+ // }}}
+ // {{{ public function setStream()
+
+ /**
+ * Optional method to set the internal stream handle
+ *
+ * @param mixed $stream
+ * @access public
+ * @return void
+ */
+ public function setStream($stream)
+ {
+ $this->stream = $stream;
+ }
+
+ // }}}
+ // {{{ public function connect()
+
+ /**
+ * Connects the socket
+ *
+ * @access public
+ * @return void
+ */
+ public function connect()
+ {
+ if (is_resource($this->stream)) {
+ return false;
+ }
+
+ if (empty($this->host)) {
+ throw new \Kafka\Exception('Cannot open null host.');
+ }
+ if ($this->port <= 0) {
+ throw new \Kafka\Exception('Cannot open without port.');
+ }
+
+ $this->stream = @fsockopen(
+ $this->host,
+ $this->port,
+ $errno,
+ $errstr,
+ $this->sendTimeoutSec + ($this->sendTimeoutUsec / 1000000)
+ );
+
+ if ($this->stream == false) {
+ $error = 'Could not connect to '
+ . $this->host . ':' . $this->port
+ . ' ('.$errstr.' ['.$errno.'])';
+ throw new \Kafka\Exception\SocketConnect($error);
+ }
+
+ stream_set_blocking($this->stream, 0);
+ }
+
+ // }}}
+ // {{{ public function close()
+
+ /**
+ * close the socket
+ *
+ * @access public
+ * @return void
+ */
+ public function close()
+ {
+ if (is_resource($this->stream)) {
+ fclose($this->stream);
+ }
+ }
+
+ // }}}
+ // {{{ public function read()
+
+ /**
+ * Read from the socket at most $len bytes.
+ *
+ * This method will not wait for all the requested data, it will return as
+ * soon as any data is received.
+ *
+ * @param integer $len Maximum number of bytes to read.
+ * @param boolean $verifyExactLength Throw an exception if the number of read bytes is less than $len
+ *
+ * @return string Binary data
+ * @throws Kafka_Exception_Socket
+ */
+ public function read($len, $verifyExactLength = false)
+ {
+ if ($len > self::READ_MAX_LEN) {
+ throw new \Kafka\Exception\SocketEOF('Could not read '.$len.' bytes from stream, length too longer.');
+ }
+
+ $null = null;
+ $read = array($this->stream);
+ $readable = @stream_select($read, $null, $null, $this->recvTimeoutSec, $this->recvTimeoutUsec);
+ if ($readable > 0) {
+ $remainingBytes = $len;
+ $data = $chunk = '';
+ while ($remainingBytes > 0) {
+ $chunk = fread($this->stream, $remainingBytes);
+ if ($chunk === false) {
+ $this->close();
+ throw new \Kafka\Exception\SocketEOF('Could not read '.$len.' bytes from stream (no data)');
+ }
+ if (strlen($chunk) === 0) {
+ // Zero bytes because of EOF?
+ if (feof($this->stream)) {
+ $this->close();
+ throw new \Kafka\Exception\SocketEOF('Unexpected EOF while reading '.$len.' bytes from stream (no data)');
+ }
+ // Otherwise wait for bytes
+ $readable = @stream_select($read, $null, $null, $this->recvTimeoutSec, $this->recvTimeoutUsec);
+ if ($readable !== 1) {
+ throw new \Kafka\Exception\SocketTimeout('Timed out reading socket while reading ' . $len . ' bytes with ' . $remainingBytes . ' bytes to go');
+ }
+ continue; // attempt another read
+ }
+ $data .= $chunk;
+ $remainingBytes -= strlen($chunk);
+ }
+ if ($len === $remainingBytes || ($verifyExactLength && $len !== strlen($data))) {
+ // couldn't read anything at all OR reached EOF sooner than expected
+ $this->close();
+ throw new \Kafka\Exception\SocketEOF('Read ' . strlen($data) . ' bytes instead of the requested ' . $len . ' bytes');
+ }
+
+ return $data;
+ }
+ if (false !== $readable) {
+ $res = stream_get_meta_data($this->stream);
+ if (!empty($res['timed_out'])) {
+ $this->close();
+ throw new \Kafka\Exception\SocketTimeout('Timed out reading '.$len.' bytes from stream');
+ }
+ }
+ $this->close();
+ throw new \Kafka\Exception\SocketEOF('Could not read '.$len.' bytes from stream (not readable)');
+
+ }
+
+ // }}}
+ // {{{ public function write()
+
+ /**
+ * Write to the socket.
+ *
+ * @param string $buf The data to write
+ *
+ * @return integer
+ * @throws Kafka_Exception_Socket
+ */
+ public function write($buf)
+ {
+ $null = null;
+ $write = array($this->stream);
+
+ // fwrite to a socket may be partial, so loop until we
+ // are done with the entire buffer
+ $written = 0;
+ $buflen = strlen($buf);
+ while ( $written < $buflen ) {
+ // wait for stream to become available for writing
+ $writable = stream_select($null, $write, $null, $this->sendTimeoutSec, $this->sendTimeoutUsec);
+ if ($writable > 0) {
+ // write remaining buffer bytes to stream
+ $wrote = fwrite($this->stream, substr($buf, $written));
+ if ($wrote === -1 || $wrote === false) {
+ throw new \Kafka\Exception\Socket('Could not write ' . strlen($buf) . ' bytes to stream, completed writing only ' . $written . ' bytes');
+ }
+ $written += $wrote;
+ continue;
+ }
+ if (false !== $writable) {
+ $res = stream_get_meta_data($this->stream);
+ if (!empty($res['timed_out'])) {
+ throw new \Kafka\Exception\SocketTimeout('Timed out writing ' . strlen($buf) . ' bytes to stream after writing ' . $written . ' bytes');
+ }
+ }
+ throw new \Kafka\Exception\Socket('Could not write ' . strlen($buf) . ' bytes to stream');
+ }
+ return $written;
+ }
+
+ // }}}
+ // {{{ public function rewind()
+
+ /**
+ * Rewind the stream
+ *
+ * @return void
+ */
+ public function rewind()
+ {
+ if (is_resource($this->stream)) {
+ rewind($this->stream);
+ }
+ }
+
+ // }}}
+ // }}}
+}
diff --git a/vendor/nmred/kafka-php/src/Kafka/ZooKeeper.php b/vendor/nmred/kafka-php/src/Kafka/ZooKeeper.php
new file mode 100644
index 00000000..f48b5cb9
--- /dev/null
+++ b/vendor/nmred/kafka-php/src/Kafka/ZooKeeper.php
@@ -0,0 +1,364 @@
+<?php
+/* vim: set expandtab tabstop=4 shiftwidth=4 softtabstop=4 foldmethod=marker: */
+// +---------------------------------------------------------------------------
+// | SWAN [ $_SWANBR_SLOGAN_$ ]
+// +---------------------------------------------------------------------------
+// | Copyright $_SWANBR_COPYRIGHT_$
+// +---------------------------------------------------------------------------
+// | Version $_SWANBR_VERSION_$
+// +---------------------------------------------------------------------------
+// | Licensed ( $_SWANBR_LICENSED_URL_$ )
+// +---------------------------------------------------------------------------
+// | $_SWANBR_WEB_DOMAIN_$
+// +---------------------------------------------------------------------------
+
+namespace Kafka;
+
+/**
++------------------------------------------------------------------------------
+* Kafka protocol since Kafka v0.8
++------------------------------------------------------------------------------
+*
+* @package
+* @version $_SWANBR_VERSION_$
+* @copyright Copyleft
+* @author $_SWANBR_AUTHOR_$
++------------------------------------------------------------------------------
+*/
+
+class ZooKeeper implements \Kafka\ClusterMetaData
+{
+ // {{{ consts
+
+ /**
+ * get all broker
+ */
+ const BROKER_PATH = '/brokers/ids';
+
+ /**
+ * get broker detail
+ */
+ const BROKER_DETAIL_PATH = '/brokers/ids/%d';
+
+ /**
+ * get topic detail
+ */
+ const TOPIC_PATCH = '/brokers/topics/%s';
+
+ /**
+ * get partition state
+ */
+ const PARTITION_STATE = '/brokers/topics/%s/partitions/%d/state';
+
+ /**
+ * register consumer
+ */
+ const REG_CONSUMER = '/consumers/%s/ids/%s';
+
+ /**
+ * list consumer
+ */
+ const LIST_CONSUMER = '/consumers/%s/ids';
+
+ /**
+ * partition owner
+ */
+ const PARTITION_OWNER = '/consumers/%s/owners/%s/%d';
+
+ // }}}
+ // {{{ members
+
+ /**
+ * zookeeper
+ *
+ * @var mixed
+ * @access private
+ */
+ private $zookeeper = null;
+
+ // }}}
+ // {{{ functions
+ // {{{ public function __construct()
+
+ /**
+ * __construct
+ *
+ * @access public
+ * @return void
+ */
+ public function __construct($hostList, $timeout = null)
+ {
+ if (!is_null($timeout) && is_numeric($timeout)) {
+ $this->zookeeper = new \ZooKeeper($hostList, null, $timeout);
+ } else {
+ $this->zookeeper = new \ZooKeeper($hostList);
+ }
+ }
+
+ // }}}
+ // {{{ public function listBrokers()
+
+ /**
+ * get broker list using zookeeper
+ *
+ * @access public
+ * @return array
+ */
+ public function listBrokers()
+ {
+ $result = array();
+ $lists = $this->zookeeper->getChildren(self::BROKER_PATH);
+ if (!empty($lists)) {
+ foreach ($lists as $brokerId) {
+ $brokerDetail = $this->getBrokerDetail($brokerId);
+ if (!$brokerDetail) {
+ continue;
+ }
+ $result[$brokerId] = $brokerDetail;
+ }
+ }
+
+ return $result;
+ }
+
+ // }}}
+ // {{{ public function getBrokerDetail()
+
+ /**
+ * get broker detail
+ *
+ * @param integer $brokerId
+ * @access public
+ * @return void
+ */
+ public function getBrokerDetail($brokerId)
+ {
+ $result = array();
+ $path = sprintf(self::BROKER_DETAIL_PATH, (int) $brokerId);
+ if ($this->zookeeper->exists($path)) {
+ $result = $this->zookeeper->get($path);
+ if (!$result) {
+ return false;
+ }
+
+ $result = json_decode($result, true);
+ }
+
+ return $result;
+ }
+
+ // }}}
+ // {{{ public function getTopicDetail()
+
+ /**
+ * get topic detail
+ *
+ * @param string $topicName
+ * @access public
+ * @return void
+ */
+ public function getTopicDetail($topicName)
+ {
+ $result = array();
+ $path = sprintf(self::TOPIC_PATCH, (string) $topicName);
+ if ($this->zookeeper->exists($path)) {
+ $result = $this->zookeeper->get($path);
+ if (!$result) {
+ return false;
+ }
+ $result = json_decode($result, true);
+ }
+
+ return $result;
+ }
+
+ // }}}
+ // {{{ public function getPartitionState()
+
+ /**
+ * get partition state
+ *
+ * @param string $topicName
+ * @param integer $partitionId
+ * @access public
+ * @return void
+ */
+ public function getPartitionState($topicName, $partitionId = 0)
+ {
+ $result = array();
+ $path = sprintf(self::PARTITION_STATE, (string) $topicName, (int) $partitionId);
+ if ($this->zookeeper->exists($path)) {
+ $result = $this->zookeeper->get($path);
+ if (!$result) {
+ return false;
+ }
+ $result = json_decode($result, true);
+ }
+
+ return $result;
+ }
+
+ // }}}
+ // {{{ public function registerConsumer()
+
+ /**
+ * register consumer
+ *
+ * @param string $topicName
+ * @param integer $partitionId
+ * @access public
+ * @return void
+ */
+ public function registerConsumer($groupId, $consumerId, $topics = array())
+ {
+ if (empty($topics)) {
+ return true;
+ }
+
+ $path = sprintf(self::REG_CONSUMER, (string) $groupId, (string) $consumerId);
+ $subData = array();
+ foreach ($topics as $topic) {
+ $subData[$topic] = 1;
+ }
+ $data = array(
+ 'version' => '1',
+ 'pattern' => 'white_list',
+ 'subscription' => $subData,
+ );
+ if (!$this->zookeeper->exists($path)) {
+ $this->makeZkPath($path);
+ $this->makeZkNode($path, json_encode($data));
+ } else {
+ $this->zookeeper->set($path, json_encode($data));
+ }
+ }
+
+ // }}}
+ // {{{ public function listConsumer()
+
+ /**
+ * list consumer
+ *
+ * @param string $groupId
+ * @access public
+ * @return void
+ */
+ public function listConsumer($groupId)
+ {
+ $path = sprintf(self::LIST_CONSUMER, (string) $groupId);
+ if (!$this->zookeeper->exists($path)) {
+ return array();
+ } else {
+ return $this->zookeeper->getChildren($path);
+ }
+ }
+
+ // }}}
+ // {{{ public function getConsumersPerTopic()
+
+ /**
+ * get consumer per topic
+ *
+ * @param string $groupId
+ * @access public
+ * @return array
+ */
+ public function getConsumersPerTopic($groupId)
+ {
+ $consumers = $this->listConsumer($groupId);
+ if (empty($consumers)) {
+ return array();
+ }
+
+ $topics = array();
+ foreach ($consumers as $consumerId) {
+ $path = sprintf(self::REG_CONSUMER, (string) $groupId, (string) $consumerId);
+ if (!$this->zookeeper->exists($path)) {
+ continue;
+ }
+
+ $info = $this->zookeeper->get($path);
+ $info = json_decode($info, true);
+ $subTopic = isset($info['subscription']) ? $info['subscription'] : array();
+ foreach ($subTopic as $topic => $num) {
+ $topics[$topic] = $consumerId;
+ }
+ }
+
+ return $topics;
+ }
+
+ // }}}
+ // {{{ public function addPartitionOwner()
+
+ /**
+ * add partition owner
+ *
+ * @param string $groupId
+ * @param string $topicName
+ * @param integer $partitionId
+ * @param string $consumerId
+ * @access public
+ * @return void
+ */
+ public function addPartitionOwner($groupId, $topicName, $partitionId, $consumerId)
+ {
+ $path = sprintf(self::PARTITION_OWNER, (string) $groupId, $topicName, (string) $partitionId);
+ if (!$this->zookeeper->exists($path)) {
+ $this->makeZkPath($path);
+ $this->makeZkNode($path, $consumerId);
+ } else {
+ $this->zookeeper->set($path, $consumerId);
+ }
+ }
+
+ // }}}
+ // {{{ protected function makeZkPath()
+
+ /**
+ * Equivalent of "mkdir -p" on ZooKeeper
+ *
+ * @param string $path The path to the node
+ * @param mixed $value The value to assign to each new node along the path
+ *
+ * @return bool
+ */
+ protected function makeZkPath($path, $value = 0)
+ {
+ $parts = explode('/', $path);
+ $parts = array_filter($parts);
+ $subpath = '';
+ while (count($parts) > 1) {
+ $subpath .= '/' . array_shift($parts);
+ if (!$this->zookeeper->exists($subpath)) {
+ $this->makeZkNode($subpath, $value);
+ }
+ }
+ }
+
+ // }}}
+ // {{{ protected function makeZkNode()
+
+ /**
+ * Create a node on ZooKeeper at the given path
+ *
+ * @param string $path The path to the node
+ * @param mixed $value The value to assign to the new node
+ *
+ * @return bool
+ */
+ protected function makeZkNode($path, $value)
+ {
+ $params = array(
+ array(
+ 'perms' => \Zookeeper::PERM_ALL,
+ 'scheme' => 'world',
+ 'id' => 'anyone',
+ )
+ );
+ return $this->zookeeper->create($path, $value, $params);
+ }
+
+ // }}}
+ // }}}
+}