Commit 9b79f9ef authored by Mark Harding's avatar Mark Harding
Browse files

Merge branch 'sprint/ii.es-retry-580' into 'master'

(feat): Retry queue for ElasticSearch

See merge request !250
parents 23c93fa4 5262cdb3
Loading
Loading
Loading
Loading
+8 −1
Original line number Original line Diff line number Diff line
@@ -1415,6 +1415,13 @@ CREATE TABLE minds.hidden_hashtags (
    PRIMARY KEY (hashtag)
    PRIMARY KEY (hashtag)
);
);


CREATE TABLE minds.search_dispatcher_queue (
    entity_urn text,
    last_retry timestamp,
    retries int,
    PRIMARY KEY (entity_urn)
);

CREATE TABLE minds.user_snapshots (
CREATE TABLE minds.user_snapshots (
  user_guid varint,
  user_guid varint,
  type text,
  type text,
+25 −7
Original line number Original line Diff line number Diff line
@@ -4,10 +4,10 @@
 */
 */
namespace Minds\Core\Search;
namespace Minds\Core\Search;


use Exception;
use Minds\Core;
use Minds\Core;
use Minds\Core\Di\Di;
use Minds\Core\Di\Di;
use Minds\Core\Events\Event;
use Minds\Core\Events\Event;
use Minds\Entities;


class Events
class Events
{
{
@@ -79,12 +79,30 @@ class Events
                    return;
                    return;
                }
                }


                Di::_()->get('Search\Index')
                $entity = is_string($params['entity']) ?
                    ->index(
                        is_string($params['entity']) ?
                    unserialize($params['entity']) :
                    unserialize($params['entity']) :
                            $params['entity']
                    $params['entity'];
                    );

                try {
                    $wasIndexed = (bool) Di::_()->get('Search\Index')
                        ->index($entity);
                } catch (Exception $e) {
                    error_log("[Search/Events/search:index:dispatch] {$e}");
                    $wasIndexed = false;
                }

                // BannedException will return null (which is also falsy)
                // So we should retry only on non-null responses from index()
                if ($wasIndexed !== null) {
                    /** @var Core\Search\RetryQueue\Manager $retryQueueManager */
                    $retryQueueManager = Di::_()->get('Search\RetryQueue\Manager');

                    if ($wasIndexed) {
                        $retryQueueManager->prune($entity);
                    } else {
                        $retryQueueManager->retry($entity);
                    }
                }


            } catch (\Exception $e) {
            } catch (\Exception $e) {
                error_log('[Search/Events/search:index:dispatch] ' . get_class($e) . ': ' . $e->getMessage());
                error_log('[Search/Events/search:index:dispatch] ' . get_class($e) . ': ' . $e->getMessage());
+1 −1
Original line number Original line Diff line number Diff line
@@ -93,7 +93,7 @@ class Index
                }
                }
            }
            }
        } catch (BannedException $e) {
        } catch (BannedException $e) {
            $result = false;
            $result = null;
        } catch (\Exception $e) {
        } catch (\Exception $e) {
            error_log('[Search/Index] ' . get_class($e) . ": {$e->getMessage()}");
            error_log('[Search/Index] ' . get_class($e) . ": {$e->getMessage()}");
            $result = false;
            $result = false;
+84 −0
Original line number Original line Diff line number Diff line
<?php
/**
 * RetryQueueManager
 * @author edgebal
 */

namespace Minds\Core\Search\RetryQueue;

use Exception;
use Minds\Common\Urn;
use Minds\Core\Di\Di;
use Minds\Core\Events\EventsDispatcher;

class Manager
{
    /** @var EventsDispatcher */
    protected $eventsDispatcher;

    /** @var Repository */
    protected $repository;

    /**
     * RetryQueueManager constructor.
     * @param EventsDispatcher $eventsDispatcher
     * @param Repository $repository
     */
    public function __construct(
        $eventsDispatcher = null,
        $repository = null
    )
    {
        $this->eventsDispatcher = $eventsDispatcher ?: Di::_()->get('EventsDispatcher');
        $this->repository = $repository ?: new Repository();
    }

    /**
     * @param mixed $entity
     * @return bool
     * @throws Exception
     */
    public function prune($entity)
    {
        $urn = (string) (new Urn($entity->guid));

        $retryQueueEntry = new RetryQueueEntry();
        $retryQueueEntry
            ->setEntityUrn($urn);

        return (bool) $this->repository->delete($retryQueueEntry);
    }

    /**
     * @param mixed $entity
     * @return bool
     * @throws Exception
     */
    public function retry($entity)
    {
        $urn = (string) (new Urn($entity->guid));

        $retryQueueEntry = $this->repository->get($urn);
        $retries = $retryQueueEntry->getRetries() + 1;

        $retryQueueEntry
            ->setLastRetry(time())
            ->setRetries($retries);

        $retrySaved = $this->repository->add($retryQueueEntry);

        if (!$retrySaved) {
            error_log("[RetryQueueManager] Critical: Cannot save retry to queue table: {$urn}");
        } elseif ($retries < 5) {
            error_log("[RetryQueueManager] Warn: Re-queue: {$urn}");

            $this->eventsDispatcher->trigger('search:index', 'all', [
                'entity' => $entity
            ]);
        } else {
            error_log("[RetryQueueManager] Critical: Too many retries indexing: {$urn}");
        }

        return $retrySaved;
    }
}
+176 −0
Original line number Original line Diff line number Diff line
<?php
/**
 * Repository
 * @author edgebal
 */

namespace Minds\Core\Search\RetryQueue;

use Cassandra\Rows;
use Cassandra\Timestamp;
use Exception;
use Minds\Common\Repository\Response;
use Minds\Core\Data\Cassandra\Client as CassandraClient;
use Minds\Core\Data\Cassandra\Prepared\Custom;
use Minds\Core\Di\Di;

class Repository
{
    /** @var CassandraClient */
    protected $db;

    public function __construct(
        $db = null
    )
    {
        $this->db = $db ?: Di::_()->get('Database\Cassandra\Cql');
    }

    /**
     * @param array $opts
     * @return Response
     */
    public function getList(array $opts = [])
    {
        $opts = array_merge([
            'entity_urn' => null,
        ], $opts);

        $cql = "SELECT * FROM search_dispatcher_queue";
        $values = [];
        $cqlOpts = [];

        if ($opts['entity_urn']) {
            $cql .= ' WHERE entity_urn = ?';
            $values[] = $opts['entity_urn'];
        }

        if ($opts['limit'] ?? null) {
            $cqlOpts['page_size'] = (int) $opts['limit'];
        }

        if ($opts['offset'] ?? null) {
            $cqlOpts['paging_state_token'] = base64_decode($opts['offset']);
        }

        $prepared = new Custom();
        $prepared->query($cql, $values);
        $prepared->setOpts($cqlOpts);

        $response = new Response();

        try {
            /** @var Rows $rows */
            $rows = $this->db->request($prepared);

            if ($rows) {
                foreach ($rows as $row) {
                    $retryQueueEntry = new RetryQueueEntry();
                    $retryQueueEntry
                        ->setEntityUrn($row['entity_urn'])
                        ->setLastRetry($row['last_retry']->time())
                        ->setRetries($row['retries']);

                    $response[] = $retryQueueEntry;
                }

                $response->setPagingToken(base64_encode($rows->pagingStateToken()));
                $response->setLastPage($rows->isLastPage());
            }
        } catch (Exception $e) {
            error_log($e);
            $response->setException($e);
        }

        return $response;
    }

    /**
     * @param $urn
     * @return RetryQueueEntry
     */
    public function get($urn)
    {
        $retryQueueEntries = $this->getList([
            'entity_urn' => $urn,
        ])->toArray();

        if (count($retryQueueEntries)) {
            return $retryQueueEntries[0];
        } else {
            $retryQueueEntry = new RetryQueueEntry();
            $retryQueueEntry
                ->setEntityUrn($urn)
                ->setLastRetry(time())
                ->setRetries(0);

            return $retryQueueEntry;
        }
    }

    /**
     * @param RetryQueueEntry $retryQueueEntry
     * @return bool
     * @throws Exception
     */
    public function add(RetryQueueEntry $retryQueueEntry)
    {
        if (!$retryQueueEntry->getEntityUrn()) {
            throw new Exception('Missing URN');
        }

        $cql = "INSERT INTO search_dispatcher_queue (entity_urn, last_retry, retries) VALUES (?, ?, ?)";
        $values = [
            (string) $retryQueueEntry->getEntityUrn(),
            new Timestamp($retryQueueEntry->getLastRetry()),
            (int) $retryQueueEntry->getRetries(),
        ];

        $prepared = new Custom();
        $prepared->query($cql, $values);

        try {
            return (bool) $this->db->request($prepared, true);
        } catch (Exception $e) {
            error_log($e);
            return false;
        }
    }

    /**
     * @param RetryQueueEntry $retryQueueEntry
     * @return bool
     * @throws Exception
     */
    public function update(RetryQueueEntry $retryQueueEntry)
    {
        return $this->add($retryQueueEntry);
    }

    /**
     * @param RetryQueueEntry $retryQueueEntry
     * @return bool
     * @throws Exception
     */
    public function delete(RetryQueueEntry $retryQueueEntry)
    {
        if (!$retryQueueEntry->getEntityUrn()) {
            throw new Exception('Missing URN');
        }

        $cql = "DELETE FROM search_dispatcher_queue WHERE entity_urn = ?";
        $values = [
            (string) $retryQueueEntry->getEntityUrn(),
        ];

        $prepared = new Custom();
        $prepared->query($cql, $values);

        try {
            return (bool) $this->db->request($prepared, true);
        } catch (Exception $e) {
            error_log($e);
            return false;
        }
    }
}
Loading