summaryrefslogtreecommitdiff
path: root/vendor/ruflin/elastica/lib/Elastica/Bulk.php
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/ruflin/elastica/lib/Elastica/Bulk.php')
-rw-r--r--vendor/ruflin/elastica/lib/Elastica/Bulk.php442
1 files changed, 442 insertions, 0 deletions
diff --git a/vendor/ruflin/elastica/lib/Elastica/Bulk.php b/vendor/ruflin/elastica/lib/Elastica/Bulk.php
new file mode 100644
index 00000000..e7693dce
--- /dev/null
+++ b/vendor/ruflin/elastica/lib/Elastica/Bulk.php
@@ -0,0 +1,442 @@
+<?php
+namespace Elastica;
+
+use Elastica\Bulk\Action;
+use Elastica\Bulk\Action\AbstractDocument as AbstractDocumentAction;
+use Elastica\Bulk\Response as BulkResponse;
+use Elastica\Bulk\ResponseSet;
+use Elastica\Exception\Bulk\ResponseException as BulkResponseException;
+use Elastica\Exception\Bulk\UdpException;
+use Elastica\Exception\InvalidException;
+
+class Bulk
+{
+ const DELIMITER = "\n";
+
+ const UDP_DEFAULT_HOST = 'localhost';
+ const UDP_DEFAULT_PORT = 9700;
+
+ /**
+ * @var \Elastica\Client
+ */
+ protected $_client;
+
+ /**
+ * @var \Elastica\Bulk\Action[]
+ */
+ protected $_actions = array();
+
+ /**
+ * @var string
+ */
+ protected $_index = '';
+
+ /**
+ * @var string
+ */
+ protected $_type = '';
+
+ /**
+ * @var array request parameters to the bulk api
+ */
+ protected $_requestParams = array();
+
+ /**
+ * @param \Elastica\Client $client
+ */
+ public function __construct(Client $client)
+ {
+ $this->_client = $client;
+ }
+
+ /**
+ * @param string|\Elastica\Index $index
+ *
+ * @return $this
+ */
+ public function setIndex($index)
+ {
+ if ($index instanceof Index) {
+ $index = $index->getName();
+ }
+
+ $this->_index = (string) $index;
+
+ return $this;
+ }
+
+ /**
+ * @return string
+ */
+ public function getIndex()
+ {
+ return $this->_index;
+ }
+
+ /**
+ * @return bool
+ */
+ public function hasIndex()
+ {
+ return '' !== $this->getIndex();
+ }
+
+ /**
+ * @param string|\Elastica\Type $type
+ *
+ * @return $this
+ */
+ public function setType($type)
+ {
+ if ($type instanceof Type) {
+ $this->setIndex($type->getIndex()->getName());
+ $type = $type->getName();
+ }
+
+ $this->_type = (string) $type;
+
+ return $this;
+ }
+
+ /**
+ * @return string
+ */
+ public function getType()
+ {
+ return $this->_type;
+ }
+
+ /**
+ * @return bool
+ */
+ public function hasType()
+ {
+ return '' !== $this->_type;
+ }
+
+ /**
+ * @return string
+ */
+ public function getPath()
+ {
+ $path = '';
+ if ($this->hasIndex()) {
+ $path .= $this->getIndex().'/';
+ if ($this->hasType()) {
+ $path .= $this->getType().'/';
+ }
+ }
+ $path .= '_bulk';
+
+ return $path;
+ }
+
+ /**
+ * @param \Elastica\Bulk\Action $action
+ *
+ * @return $this
+ */
+ public function addAction(Action $action)
+ {
+ $this->_actions[] = $action;
+
+ return $this;
+ }
+
+ /**
+ * @param \Elastica\Bulk\Action[] $actions
+ *
+ * @return $this
+ */
+ public function addActions(array $actions)
+ {
+ foreach ($actions as $action) {
+ $this->addAction($action);
+ }
+
+ return $this;
+ }
+
+ /**
+ * @return \Elastica\Bulk\Action[]
+ */
+ public function getActions()
+ {
+ return $this->_actions;
+ }
+
+ /**
+ * @param \Elastica\Document $document
+ * @param string $opType
+ *
+ * @return $this
+ */
+ public function addDocument(Document $document, $opType = null)
+ {
+ $action = AbstractDocumentAction::create($document, $opType);
+
+ return $this->addAction($action);
+ }
+
+ /**
+ * @param \Elastica\Document[] $documents
+ * @param string $opType
+ *
+ * @return $this
+ */
+ public function addDocuments(array $documents, $opType = null)
+ {
+ foreach ($documents as $document) {
+ $this->addDocument($document, $opType);
+ }
+
+ return $this;
+ }
+
+ /**
+ * @param \Elastica\Script $data
+ * @param string $opType
+ *
+ * @return $this
+ */
+ public function addScript(Script $script, $opType = null)
+ {
+ $action = AbstractDocumentAction::create($script, $opType);
+
+ return $this->addAction($action);
+ }
+
+ /**
+ * @param \Elastica\Document[] $scripts
+ * @param string $opType
+ *
+ * @return $this
+ */
+ public function addScripts(array $scripts, $opType = null)
+ {
+ foreach ($scripts as $document) {
+ $this->addScript($document, $opType);
+ }
+
+ return $this;
+ }
+
+ /**
+ * @param \Elastica\Script|\Elastica\Document\array $data
+ * @param string $opType
+ *
+ * @return $this
+ */
+ public function addData($data, $opType = null)
+ {
+ if (!is_array($data)) {
+ $data = array($data);
+ }
+
+ foreach ($data as $actionData) {
+ if ($actionData instanceof Script) {
+ $this->addScript($actionData, $opType);
+ } elseif ($actionData instanceof Document) {
+ $this->addDocument($actionData, $opType);
+ } else {
+ throw new \InvalidArgumentException('Data should be a Document, a Script or an array containing Documents and/or Scripts');
+ }
+ }
+
+ return $this;
+ }
+
+ /**
+ * @param array $data
+ *
+ * @throws \Elastica\Exception\InvalidException
+ *
+ * @return $this
+ */
+ public function addRawData(array $data)
+ {
+ foreach ($data as $row) {
+ if (is_array($row)) {
+ $opType = key($row);
+ $metadata = reset($row);
+ if (Action::isValidOpType($opType)) {
+ // add previous action
+ if (isset($action)) {
+ $this->addAction($action);
+ }
+ $action = new Action($opType, $metadata);
+ } elseif (isset($action)) {
+ $action->setSource($row);
+ $this->addAction($action);
+ $action = null;
+ } else {
+ throw new InvalidException('Invalid bulk data, source must follow action metadata');
+ }
+ } else {
+ throw new InvalidException('Invalid bulk data, should be array of array, Document or Bulk/Action');
+ }
+ }
+
+ // add last action if available
+ if (isset($action)) {
+ $this->addAction($action);
+ }
+
+ return $this;
+ }
+
+ /**
+ * Set a url parameter on the request bulk request.
+ *
+ * @param string $name name of the parameter
+ * @param string $value value of the parameter
+ *
+ * @return $this
+ */
+ public function setRequestParam($name, $value)
+ {
+ $this->_requestParams[$name] = $value;
+
+ return $this;
+ }
+
+ /**
+ * Set the amount of time that the request will wait the shards to come on line.
+ * Requires Elasticsearch version >= 0.90.8.
+ *
+ * @param string $time timeout in Elasticsearch time format
+ *
+ * @return $this
+ */
+ public function setShardTimeout($time)
+ {
+ return $this->setRequestParam('timeout', $time);
+ }
+
+ /**
+ * @return string
+ */
+ public function __toString()
+ {
+ return $this->toString();
+ }
+
+ /**
+ * @return string
+ */
+ public function toString()
+ {
+ $data = '';
+ foreach ($this->getActions() as $action) {
+ $data .= $action->toString();
+ }
+
+ return $data;
+ }
+
+ /**
+ * @return array
+ */
+ public function toArray()
+ {
+ $data = array();
+ foreach ($this->getActions() as $action) {
+ foreach ($action->toArray() as $row) {
+ $data[] = $row;
+ }
+ }
+
+ return $data;
+ }
+
+ /**
+ * @return \Elastica\Bulk\ResponseSet
+ */
+ public function send()
+ {
+ $path = $this->getPath();
+ $data = $this->toString();
+
+ $response = $this->_client->request($path, Request::PUT, $data, $this->_requestParams);
+
+ return $this->_processResponse($response);
+ }
+
+ /**
+ * @param \Elastica\Response $response
+ *
+ * @throws \Elastica\Exception\Bulk\ResponseException
+ * @throws \Elastica\Exception\InvalidException
+ *
+ * @return \Elastica\Bulk\ResponseSet
+ */
+ protected function _processResponse(Response $response)
+ {
+ $responseData = $response->getData();
+
+ $actions = $this->getActions();
+
+ $bulkResponses = array();
+
+ if (isset($responseData['items']) && is_array($responseData['items'])) {
+ foreach ($responseData['items'] as $key => $item) {
+ if (!isset($actions[$key])) {
+ throw new InvalidException('No response found for action #'.$key);
+ }
+
+ $action = $actions[$key];
+
+ $opType = key($item);
+ $bulkResponseData = reset($item);
+
+ if ($action instanceof AbstractDocumentAction) {
+ $data = $action->getData();
+ if ($data instanceof Document && $data->isAutoPopulate()
+ || $this->_client->getConfigValue(array('document', 'autoPopulate'), false)
+ ) {
+ if (!$data->hasId() && isset($bulkResponseData['_id'])) {
+ $data->setId($bulkResponseData['_id']);
+ }
+ if (isset($bulkResponseData['_version'])) {
+ $data->setVersion($bulkResponseData['_version']);
+ }
+ }
+ }
+
+ $bulkResponses[] = new BulkResponse($bulkResponseData, $action, $opType);
+ }
+ }
+
+ $bulkResponseSet = new ResponseSet($response, $bulkResponses);
+
+ if ($bulkResponseSet->hasError()) {
+ throw new BulkResponseException($bulkResponseSet);
+ }
+
+ return $bulkResponseSet;
+ }
+
+ /**
+ * @param string $host
+ * @param int $port
+ *
+ * @throws \Elastica\Exception\Bulk\UdpException
+ */
+ public function sendUdp($host = null, $port = null)
+ {
+ if (null === $host) {
+ $host = $this->_client->getConfigValue(array('udp', 'host'), self::UDP_DEFAULT_HOST);
+ }
+ if (null === $port) {
+ $port = $this->_client->getConfigValue(array('udp', 'port'), self::UDP_DEFAULT_PORT);
+ }
+
+ $message = $this->toString();
+ $socket = socket_create(AF_INET, SOCK_DGRAM, SOL_UDP);
+ $result = socket_sendto($socket, $message, strlen($message), 0, $host, $port);
+ socket_close($socket);
+ if (false === $result) {
+ throw new UdpException('UDP request failed');
+ }
+ }
+}