Skip to content
Commits on Source (2)
<?php
namespace Minds\Controllers\Cli;
use Minds\Core;
use Minds\Core\Analytics\EntityCentric\Manager;
use Minds\Cli;
use Minds\Interfaces;
use Minds\Exceptions;
use Minds\Entities;
class EntityCentric extends Cli\Controller implements Interfaces\CliControllerInterface
{
public function __construct()
{
}
public function help($command = null)
{
$this->out('TBD');
}
public function exec()
{
$this->out('Missing subcommand');
}
public function sync()
{
error_reporting(E_ALL);
ini_set('display_errors', 1);
$daysAgo = $this->getOpt('daysAgo') ?: 0;
$from = $this->getOpt('from') ?: strtotime("midnight $daysAgo days ago");
$manager = new Manager();
$manager->setFrom($from);
$i = 0;
foreach ($manager->sync() as $record) {
$this->out(++$i);
}
}
}
<?php
namespace Minds\Core\Analytics\EntityCentric;
use Minds\Core\Analytics\Metrics\Active;
use DateTime;
use Exception;
class ActiveUsersSynchroniser
{
/** @var array */
private $records = [];
/** @var Active */
private $activeMetric;
public function __construct($activeMetric = null)
{
$this->activeMetric = $activeMetric ?? new Active();
}
/**
* @param int $from
* @return self
*/
public function setFrom($from): self
{
$this->from = $from;
return $this;
}
/**
* Convert to records
* @return iterable
*/
public function toRecords(): iterable
{
$date = (new DateTime())->setTimestamp($this->from);
$now = new DateTime();
$days = (int) $date->diff($now)->format('%a');
$months = round($days / 28);
// Daily resolution
foreach ($this->activeMetric->get($days ?: 1) as $bucket) {
$record = new EntityCentricRecord();
$record->setEntityUrn("urn:metric:global")
->setOwnerGuid((string) 0) // Site is owner
->setTimestamp($bucket['timestamp'])
->setResolution('day')
->incrementSum('active::total', $bucket['total']);
$this->records[] = $record;
}
// Monthly resolution
foreach ($this->activeMetric->get($months ?: 1, 'month') as $bucket) {
$record = new EntityCentricRecord();
$record->setEntityUrn("urn:metric:global")
->setOwnerGuid((string) 0) // Site is owner
->setTimestamp($bucket['timestamp'])
->setResolution('month')
->incrementSum('active::total', $bucket['total']);
$this->records[] = $record;
}
foreach ($this->records as $record) {
yield $record;
}
}
}
<?php
/**
* EntityCentricRecord
* @author Mark
*/
namespace Minds\Core\Analytics\EntityCentric;
use Minds\Traits\MagicAttributes;
/**
* Class EntityCentricRecord
* @package Minds\Core\Analytics\EntityCentric
* @method EntityCentricRecord setResolution(int $year)
* @method string getResolution()
* @method EntityCentricRecord setEntityUrn(string $entityUrn)
* @method string getEntityUrn()
* @method EntityCentricRecord setOwnerGuid(string $ownerGuid)
* @method string getOwnerGuid()
* @method EntityCentricRecord setTimestampMs(int $timestampMs)
* @method int getTimestampMs()
* @method EntityCentricRecord setTimestamp(int $timestamp)
* @method int getTimestamp()
* @method EntityCentricRecord setSums(array $sums)
* @method int getSums()
*/
class EntityCentricRecord
{
use MagicAttributes;
/** @var string */
private $resolution;
/** @var int */
protected $timestamp;
/** @var int */
protected $timestampMs;
/** @var string */
protected $entityUrn;
/** @var string */
protected $ownerGuid;
/** @var array */
private $sums;
/**
* Increment views
* @param string $metric
* @param int $value
* @return EntityCentricRecord
*/
public function incrementSum($metric, $value = 1): EntityCentricRecord
{
if (!isset($this->sums[$metric])) {
$this->sums[$metric] = 0;
}
$this->sums[$metric] = $this->sums[$metric] + $value;
return $this;
}
}
<?php
/**
* EntityCentric Manager
* @author Mark
*/
namespace Minds\Core\Analytics\EntityCentric;
use DateTime;
use Exception;
class Manager
{
/** @var array */
const SYNCHRONISERS = [
SignupsSynchroniser::class,
ActiveUsersSynchroniser::class,
ViewsSynchroniser::class,
];
/** @var Repository */
protected $repository;
/** @var int */
private $from;
/** @var int */
private $to;
public function __construct(
$repository = null
) {
$this->repository = $repository ?: new Repository();
}
/**
* @param int $from
* @return self
*/
public function setFrom($from): self
{
$this->from = $from;
return $this;
}
/**
* Synchronise views from cassandra to elastic
* @return iterable
*/
public function sync(): iterable
{
foreach (Manager::SYNCHRONISERS as $synchroniserClass) {
$synchroniser = new $synchroniserClass;
$date = (new DateTime())->setTimestamp($this->from);
$synchroniser->setFrom($this->from);
foreach ($synchroniser->toRecords() as $record) {
$this->add($record);
yield $record;
}
// Call again incase any leftover
$this->repository->bulk();
}
echo "done";
}
/**
* Add an entity centric record to the database
* @param EntityCentricRecord $record
* @return bool
*/
public function add(EntityCentricRecord $record): bool
{
return (bool) $this->repository->add($record);
}
/**
* Query aggregate
* @param array $query
* @return array
*/
public function getAggregateByQuery(array $query): array
{
}
}
<?php
/**
* EntityCentric Repository
* @author Mark
*/
namespace Minds\Core\Analytics\EntityCentric;
use DateTime;
use DateTimeZone;
use Exception;
use Minds\Common\Repository\Response;
use Minds\Core\Data\ElasticSearch\Client as ElasticClient;
use Minds\Core\Di\Di;
class Repository
{
/** @var ElasticClient */
protected $es;
/** @var array $pendingBulkInserts * */
private $pendingBulkInserts = [];
/**
* Repository constructor.
* @param ElasticClient $es
*/
public function __construct(
$es = null
) {
$this->es = $es ?: Di::_()->get('Database\ElasticSearch');
}
/**
* @param array $opts
* @return Response
*/
public function getList(array $opts = [])
{
$response = new Response();
return $response;
}
/**
* @param EntityCentricRecord $record
* @return bool
* @throws Exception
*/
public function add(EntityCentricRecord $record)
{
$index = 'minds-entitycentric-' . date('m-Y', $record->getTimestamp());
$body = [
'resolution' => $record->getResolution(),
'@timestamp' => $record->getTimestamp() * 1000,
'entity_urn' => $record->getEntityUrn(),
'owner_guid' => $record->getOwnerGuid(),
];
$body = array_merge($body, $record->getSums());
$body = array_filter($body, function ($val) {
if ($val === '' || $val === null) {
return false;
}
return true;
});
$this->pendingBulkInserts[] = [
'update' => [
'_id' => (string) implode('-', [ $record->getEntityUrn(), $record->getResolution(), $record->getTimestamp() ]),
'_index' => $index,
'_type' => '_doc',
],
];
$this->pendingBulkInserts[] = [
'doc' => $body,
'doc_as_upsert' => true,
];
if (count($this->pendingBulkInserts) > 2000) { //1000 inserts
$this->bulk();
}
}
/**
* Bulk insert results
*/
public function bulk()
{
if (count($this->pendingBulkInserts) > 0) {
$res = $this->es->bulk(['body' => $this->pendingBulkInserts]);
$this->pendingBulkInserts = [];
}
}
}
<?php
namespace Minds\Core\Analytics\EntityCentric;
use Minds\Core\Analytics\Metrics\Signup;
use DateTime;
use Exception;
class SignupsSynchroniser
{
/** @var array */
private $records = [];
/** @var Signup */
private $signupMetric;
public function __construct($signupMetric = null)
{
$this->signupMetric = $signupMetric ?? new Signup;
}
/**
* @param int $from
* @return self
*/
public function setFrom($from): self
{
$this->from = $from;
return $this;
}
/**
* Convert to records
* @return iterable
*/
public function toRecords(): iterable
{
$date = (new DateTime())->setTimestamp($this->from);
$now = new DateTime();
$days = (int) $date->diff($now)->format('%a');
foreach ($this->signupMetric->get($days) as $bucket) {
error_log($bucket['date']);
$record = new EntityCentricRecord();
$record->setEntityUrn("urn:metric:global")
->setOwnerGuid((string) 0) // Site is owner
->setTimestamp($bucket['timestamp'])
->setResolution('day')
->incrementSum('signups::total', $bucket['total']);
$this->records[] = $record;
}
foreach ($this->records as $record) {
yield $record;
}
}
}
<?php
namespace Minds\Core\Analytics\EntityCentric;
use Minds\Core\Analytics\Views\Repository as ViewsRepository;
use DateTime;
use Exception;
class ViewsSynchroniser
{
/** @var array */
private $records = [];
/** @var ViewsRepository */
private $viewsRepository;
public function __construct($viewsRepository = null)
{
$this->viewsRepository = $viewsRepository ?: new ViewsRepository();
}
/**
* @param int $from
* @return self
*/
public function setFrom($from): self
{
$this->from = $from;
return $this;
}
/**
* Convert to records
* @return iterable
*/
public function toRecords(): iterable
{
$date = (new DateTime())->setTimestamp($this->from);
$opts['day'] = intval($date->format('d'));
$opts['month'] = intval($date->format('m'));
$opts['year'] = $date->format('Y');
$opts['from'] = $this->from;
$i = 0;
while (true) {
$result = $this->viewsRepository->getList($opts);
$opts['offset'] = $result->getPagingToken();
foreach ($result as $view) {
// if (!in_array($view->getSource(), [ 'single', 'feed/channel'])) {
// continue;
// }
$this->downsampleViewToRecord($view);
error_log(++$i);
}
if ($result->isLastPage()) {
break;
}
}
foreach ($this->records as $record) {
yield $record;
}
}
/**
* Add entity to map
* @param View $view
* @return void
*/
private function downsampleViewToRecord($view): void
{
$entityUrn = $view->getEntityUrn();
if (!isset($this->records[$view->getEntityUrn()])) {
$timestamp = (new \DateTime())->setTimestamp($view->getTimestamp())->setTime(0, 0, 0);
$record = new EntityCentricRecord();
$record->setEntityUrn($view->getEntityUrn())
->setOwnerGuid($view->getOwnerGuid())
->setTimestamp($timestamp->getTimestamp())
->setResolution('day');
$this->records[$view->getEntityUrn()] = $record;
}
if ($view->getCampaign()) {
$this->records[$view->getEntityUrn()]->incrementSum('views::boosted');
} else {
$this->records[$view->getEntityUrn()]->incrementSum('views::organic');
}
if ($view->getSource() === 'single') {
$this->records[$view->getEntityUrn()]->incrementSum('views::single');
}
$this->records[$view->getEntityUrn()]->incrementSum('views::total');
}
}