summaryrefslogtreecommitdiff
path: root/vendor/nmred/kafka-php/src/Kafka/Protocol/Fetch/Helper/Consumer.php
blob: acf0223e662ada7525737b2b1adbb767ec7f9335 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
<?php
namespace Kafka\Protocol\Fetch\Helper;
/**
 * Description of Consumer
 *
 * @author daniel
 */
class Consumer extends HelperAbstract
{
    protected $consumer;

    protected $offsetStrategy;


    public function __construct(\Kafka\Consumer $consumer)
    {
        $this->consumer = $consumer;
    }


    public function onPartitionEof($partition)
    {
        $partitionId = $partition->key();
        $topicName = $partition->getTopicName();
        $offset    = $partition->getMessageOffset();
        $this->consumer->setFromOffset(true);
        $this->consumer->setPartition($topicName, $partitionId, ($offset +1));
    }

    public function onStreamEof($streamKey)
    {

    }

    public function onTopicEof($topicName)
    {

    }
}