Commit 7a65572f authored by Mark Harding's avatar Mark Harding
Browse files

Entity Centric Manager

parent 416076ce
Loading
Loading
Loading
Loading
+43 −0
Original line number Original line Diff line number Diff line
<?php

namespace Minds\Controllers\Cli;

use Minds\Core;
use Minds\Core\Analytics\EntityCentric\Manager;
use Minds\Cli;
use Minds\Interfaces;
use Minds\Exceptions;
use Minds\Entities;

class EntityCentric extends Cli\Controller implements Interfaces\CliControllerInterface
{
    public function __construct()
    {
    }

    public function help($command = null)
    {
        $this->out('TBD');
    }
    
    public function exec()
    {
        $this->out('Missing subcommand');
    }

    public function sync()
    {
        error_reporting(E_ALL);
        ini_set('display_errors', 1);

        $daysAgo = $this->getOpt('daysAgo') ?: 0;
        $from = $this->getOpt('from') ?: strtotime("midnight $daysAgo days ago");
        $manager = new Manager();
        $manager->setFrom($from);

        $i = 0;
        foreach ($manager->sync() as $record) {
            $this->out(++$i);
        }
    }
}
+68 −0
Original line number Original line Diff line number Diff line
<?php
namespace Minds\Core\Analytics\EntityCentric;

use Minds\Core\Analytics\Metrics\Active;
use DateTime;
use Exception;

class ActiveUsersSynchroniser
{
    /** @var array */
    private $records = [];

    /** @var Active */
    private $activeMetric;

    public function __construct($activeMetric = null)
    {
        $this->activeMetric = $activeMetric ?? new Active();
    }

    /**
     * @param int $from
     * @return self
     */
    public function setFrom($from): self
    {
        $this->from = $from;
        return $this;
    }

    /**
     * Convert to records
     * @return iterable
     */
    public function toRecords(): iterable
    {
        $date = (new DateTime())->setTimestamp($this->from);
        $now = new DateTime();
        $days = (int) $date->diff($now)->format('%a');
        $months = round($days / 28);

        // Daily resolution
        foreach ($this->activeMetric->get($days ?: 1) as $bucket) {
            $record = new EntityCentricRecord();
            $record->setEntityUrn("urn:metric:global")
                ->setOwnerGuid((string) 0) // Site is owner
                ->setTimestamp($bucket['timestamp'])
                ->setResolution('day')
                ->incrementSum('active::total', $bucket['total']);
            $this->records[] = $record;
        }

        // Monthly resolution
        foreach ($this->activeMetric->get($months ?: 1, 'month') as $bucket) {
            $record = new EntityCentricRecord();
            $record->setEntityUrn("urn:metric:global")
                ->setOwnerGuid((string) 0) // Site is owner
                ->setTimestamp($bucket['timestamp'])
                ->setResolution('month')
                ->incrementSum('active::total', $bucket['total']);
            $this->records[] = $record;
        }

        foreach ($this->records as $record) {
            yield $record;
        }
    }
}
+63 −0
Original line number Original line Diff line number Diff line
<?php
/**
 * EntityCentricRecord
 * @author Mark
 */

namespace Minds\Core\Analytics\EntityCentric;

use Minds\Traits\MagicAttributes;

/**
 * Class EntityCentricRecord
 * @package Minds\Core\Analytics\EntityCentric
 * @method EntityCentricRecord setResolution(int $year)
 * @method string getResolution()
 * @method EntityCentricRecord setEntityUrn(string $entityUrn)
 * @method string getEntityUrn()
 * @method EntityCentricRecord setOwnerGuid(string $ownerGuid)
 * @method string getOwnerGuid()
 * @method EntityCentricRecord setTimestampMs(int $timestampMs)
 * @method int getTimestampMs()
 * @method EntityCentricRecord setTimestamp(int $timestamp)
 * @method int getTimestamp()
 * @method EntityCentricRecord setSums(array $sums)
 * @method int getSums()
 */
class EntityCentricRecord
{
    use MagicAttributes;

    /** @var string */
    private $resolution;

    /** @var int */
    protected $timestamp;

    /** @var int */
    protected $timestampMs;

    /** @var string */
    protected $entityUrn;

    /** @var string */
    protected $ownerGuid;

    /** @var array */
    private $sums;

    /**
     * Increment views
     * @param string $metric
     * @param int $value
     * @return EntityCentricRecord
     */
    public function incrementSum($metric, $value = 1): EntityCentricRecord
    {
        if (!isset($this->sums[$metric])) {
            $this->sums[$metric] = 0;
        }
        $this->sums[$metric] = $this->sums[$metric] + $value;
        return $this;
    }
}
+84 −0
Original line number Original line Diff line number Diff line
<?php
/**
 * EntityCentric Manager
 * @author Mark
 */

namespace Minds\Core\Analytics\EntityCentric;

use DateTime;
use Exception;

class Manager
{
    /** @var array */
    const SYNCHRONISERS = [
        SignupsSynchroniser::class,
        ActiveUsersSynchroniser::class,
        ViewsSynchroniser::class,
    ];

    /** @var Repository */
    protected $repository;

    /** @var int */
    private $from;

    /** @var int */
    private $to;

    public function __construct(
        $repository = null
    ) {
        $this->repository = $repository ?: new Repository();
    }

    /**
     * @param int $from
     * @return self
     */
    public function setFrom($from): self
    {
        $this->from = $from;
        return $this;
    }

    /**
     * Synchronise views from cassandra to elastic
     * @return iterable
     */
    public function sync(): iterable
    {
        foreach (Manager::SYNCHRONISERS as $synchroniserClass) {
            $synchroniser = new $synchroniserClass;
            $date = (new DateTime())->setTimestamp($this->from);
            $synchroniser->setFrom($this->from);
            foreach ($synchroniser->toRecords() as $record) {
                $this->add($record);
                yield $record;
            }
            // Call again incase any leftover
            $this->repository->bulk();
        }
        echo "done";
    }

    /**
     * Add an entity centric record to the database
     * @param EntityCentricRecord $record
     * @return bool
     */
    public function add(EntityCentricRecord $record): bool
    {
        return (bool) $this->repository->add($record);
    }

    /**
     * Query aggregate
     * @param array $query
     * @return array
     */
    public function getAggregateByQuery(array $query): array
    {
    }
}
+98 −0
Original line number Original line Diff line number Diff line
<?php
/**
 * EntityCentric Repository
 * @author Mark
 */

namespace Minds\Core\Analytics\EntityCentric;

use DateTime;
use DateTimeZone;
use Exception;
use Minds\Common\Repository\Response;
use Minds\Core\Data\ElasticSearch\Client as ElasticClient;
use Minds\Core\Di\Di;

class Repository
{
    /** @var ElasticClient */
    protected $es;

    /** @var array $pendingBulkInserts * */
    private $pendingBulkInserts = [];

    /**
     * Repository constructor.
     * @param ElasticClient $es
     */
    public function __construct(
        $es = null
    ) {
        $this->es = $es ?: Di::_()->get('Database\ElasticSearch');
    }

    /**
     * @param array $opts
     * @return Response
     */
    public function getList(array $opts = [])
    {
        $response = new Response();

        return $response;
    }

    /**
     * @param EntityCentricRecord $record
     * @return bool
     * @throws Exception
     */
    public function add(EntityCentricRecord $record)
    {
        $index = 'minds-entitycentric-' . date('m-Y', $record->getTimestamp());

        $body = [
            'resolution' => $record->getResolution(),
            '@timestamp' => $record->getTimestamp() * 1000,
            'entity_urn' => $record->getEntityUrn(),
            'owner_guid' => $record->getOwnerGuid(),
        ];

        $body = array_merge($body, $record->getSums());

        $body = array_filter($body, function ($val) {
            if ($val === '' || $val === null) {
                return false;
            }
            return true;
        });

        $this->pendingBulkInserts[] = [
            'update' => [
                '_id' => (string) implode('-', [ $record->getEntityUrn(), $record->getResolution(), $record->getTimestamp() ]),
                '_index' => $index,
                '_type' => '_doc',
            ],
        ];

        $this->pendingBulkInserts[] = [
            'doc' => $body,
            'doc_as_upsert' => true,
        ];

        if (count($this->pendingBulkInserts) > 2000) { //1000 inserts
            $this->bulk();
        }
    }

    /**
     * Bulk insert results
     */
    public function bulk()
    {
        if (count($this->pendingBulkInserts) > 0) {
            $res = $this->es->bulk(['body' => $this->pendingBulkInserts]);
            $this->pendingBulkInserts = [];
        }
    }
}
Loading