Commit 118395de authored by Mark Harding's avatar Mark Harding
Browse files

(feat): entity centric metrics

parent 72c7048d
Pipeline #84060361 failed with stages
in 4 minutes and 20 seconds
<?php
namespace Minds\Controllers\Cli;
use Minds\Core;
use Minds\Core\Analytics\EntityCentric\Manager;
use Minds\Cli;
use Minds\Interfaces;
use Minds\Exceptions;
use Minds\Entities;
class EntityCentric extends Cli\Controller implements Interfaces\CliControllerInterface
{
public function __construct()
{
}
public function help($command = null)
{
$this->out('TBD');
}
public function exec()
{
$this->out('Missing subcommand');
}
public function sync()
{
error_reporting(E_ALL);
ini_set('display_errors', 1);
$daysAgo = $this->getOpt('daysAgo') ?: 0;
$from = $this->getOpt('from') ?: strtotime("midnight $daysAgo days ago");
$manager = new Manager();
$manager->setFrom($from);
$i = 0;
foreach ($manager->sync() as $record) {
$this->out(++$i);
}
}
}
<?php
/**
* EntityCentricRecord
* @author Mark
*/
namespace Minds\Core\Analytics\EntityCentric;
use Minds\Traits\MagicAttributes;
/**
* Class EntityCentricRecord
* @package Minds\Core\Analytics\Views
* @method DownsampledView setResolution(int $year)
* @method string getResolution()
* @method DownsampledView setEntityUrn(string $entityUrn)
* @method string getEntityUrn()
* @method DownsampledView setOwnerGuid(string $ownerGuid)
* @method string getOwnerGuid()
* @method DownsampledView setTimestampMs(int $timestampMs)
* @method int getTimestampMs()
* @method DownsampledView setViews(int $views)
* @method int getViews()
*/
class EntityCentricRecord
{
use MagicAttributes;
/** @var string */
private $resolution;
/** @var int */
protected $timestampMs;
/** @var string */
protected $entityUrn;
/** @var string */
protected $ownerGuid;
/** @var array */
private $sums;
/**
* Increment views
* @return DownsampledView
*/
public function incrementSum($metric): EntityCentricRecord
{
if (!isset($this->sums[$metric])) {
$this->sums[$metric] = 0;
}
++$this->sums[$metric];
return $this;
}
}
<?php
/**
* EntityCentric Manager
* @author Mark
*/
namespace Minds\Core\Analytics\EntityCentric;
use DateTime;
use Exception;
class Manager
{
/** @var array */
const SYNCHRONISERS = [
ViewsSynchroniser::class,
];
/** @var Repository */
protected $repository;
/** @var int */
private $from;
/** @var int */
private $to;
public function __construct(
$repository = null
) {
$this->repository = $repository ?: new Repository();
}
public function setFrom($from): self
{
$this->from = $from;
return $this;
}
/**
* Synchronise views from cassandra to elastic
* @return void
*/
public function sync()
{
foreach (Manager::SYNCHRONISERS as $synchroniserClass) {
$synchroniser = new $synchroniserClass;
$date = (new DateTime())->setTimestamp($this->from);
$synchroniser->setFrom($this->from);
foreach ($synchroniser->toRecords() as $record) {
$this->add($record);
yield $record;
}
}
// Call again incase any leftover
$this->repository->bulk();
echo "done";
}
/**
* Add an entity centric record to the database
* @param EntityCentricRecord $record
* @return bool
*/
public function add(EntityCentricRecord $record): bool
{
return (bool) $this->repository->add($record);
}
}
<?php
/**
* EntityCentric Repository
* @author Mark
*/
namespace Minds\Core\Analytics\EntityCentric;
use Minds\Core\Analytics\Views\DownsampledView;
use DateTime;
use DateTimeZone;
use Exception;
use Minds\Common\Repository\Response;
use Minds\Core\Data\ElasticSearch\Client as ElasticClient;
use Minds\Core\Di\Di;
class Repository
{
/** @var ElasticClient */
protected $es;
/** @var array $pendingBulkInserts * */
private $pendingBulkInserts = [];
/**
* Repository constructor.
* @param ElasticClient $es
*/
public function __construct(
$es = null
) {
$this->es = $es ?: Di::_()->get('Database\ElasticSearch');
}
/**
* @param array $opts
* @return Response
*/
public function getList(array $opts = [])
{
$response = new Response();
return $response;
}
/**
* @param EntityCentricRecord $record
* @return bool
* @throws Exception
*/
public function add(EntityCentricRecord $record)
{
$index = 'minds-entitycentric-' . date('m-Y', $record->getTimestamp());
$body = [
'resolution' => $record->getResolution(),
'@timestamp' => $record->getTimestamp() * 1000,
'entity_urn' => $record->getEntityUrn(),
'owner_guid' => $record->getOwnerGuid(),
];
$body = array_merge($body, $record->getSums());
$body = array_filter($body, function ($val) {
if ($val === '' || $val === null) {
return false;
}
return true;
});
$this->pendingBulkInserts[] = [
'update' => [
'_id' => (string) implode('-', [ $record->getEntityUrn(), $record->getResolution(), $record->getTimestamp() ]),
'_index' => $index,
'_type' => '_doc',
],
];
$this->pendingBulkInserts[] = [
'doc' => $body,
'doc_as_upsert' => true,
];
if (count($this->pendingBulkInserts) > 2000) { //1000 inserts
$this->bulk();
}
}
/**
* Bulk insert results
*/
public function bulk()
{
if (count($this->pendingBulkInserts) > 0) {
$res = $this->es->bulk(['body' => $this->pendingBulkInserts]);
$this->pendingBulkInserts = [];
}
}
}
<?php
namespace Minds\Core\Analytics\EntityCentric;
use Minds\Core\Analytics\Views\Repository as ViewsRepository;
use DateTime;
use Exception;
class ViewsSynchroniser
{
/** @var array */
private $records = [];
/** @var ViewsRepository */
private $viewsRepository;
public function __construct($viewsRepository = null)
{
$this->viewsRepository = $viewsRepository ?: new ViewsRepository();
}
public function setFrom($from): self
{
$this->from = $from;
return $this;
}
public function toRecords()
{
$date = (new DateTime())->setTimestamp($this->from);
$opts['day'] = intval($date->format('d'));
$opts['month'] = intval($date->format('m'));
$opts['year'] = $date->format('Y');
$opts['from'] = $this->from;
$i = 0;
while (true) {
$result = $this->viewsRepository->getList($opts);
$opts['offset'] = $result->getPagingToken();
foreach ($result as $view) {
// if (!in_array($view->getSource(), [ 'single', 'feed/channel'])) {
// continue;
// }
$this->downsampleViewToRecord($view);
error_log(++$i);
}
if ($result->isLastPage()) {
break;
}
}
foreach ($this->records as $record) {
yield $record;
}
}
/**
* Add entity to map
* @param View $view
* @return void
*/
private function downsampleViewToRecord($view): void
{
$entityUrn = $view->getEntityUrn();
if (!isset($this->records[$view->getEntityUrn()])) {
$record = new EntityCentricRecord();
$record->setEntityUrn($view->getEntityUrn())
->setOwnerGuid($view->getOwnerGuid())
->setTimestamp($view->getTimestamp());
$this->records[$view->getEntityUrn()] = $record;
}
if ($view->getCampaign()) {
$this->records[$view->getEntityUrn()]->incrementSum('views::boosted');
} else {
$this->records[$view->getEntityUrn()]->incrementSum('views::organic');
}
if ($view->getSource() === 'single') {
$this->records[$view->getEntityUrn()]->incrementSum('views::single');
}
$this->records[$view->getEntityUrn()]->incrementSum('views::total');
}
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment