Loading Controllers/Cli/Analytics.php +12 −6 Original line number Original line Diff line number Diff line Loading @@ -10,14 +10,12 @@ use Minds\Interfaces; class Analytics extends Cli\Controller implements Interfaces\CliControllerInterface class Analytics extends Cli\Controller implements Interfaces\CliControllerInterface { { private $start; private $elasticsearch; public function help($command = null) public function help($command = null) { { switch ($command) { switch ($command) { case 'sync_activeUsers': case 'sync_activeUsers': $this->out('Indexes user activity by guid and counts per day'); $this->out('Indexes user activity by guid and counts per day'); $this->out('--incremental sync current day for estimates of state change (overrides from and to params)'); $this->out('--from={timestamp} the day to start counting. Default is yesterday at midnight'); $this->out('--from={timestamp} the day to start counting. Default is yesterday at midnight'); $this->out('--to={timestamp} the day to stop counting. Default is yesterday at midnight'); $this->out('--to={timestamp} the day to stop counting. Default is yesterday at midnight'); $this->out('--rangeOffset={number of days} the number of days to look back into the past. Default is 7'); $this->out('--rangeOffset={number of days} the number of days to look back into the past. Default is 7'); Loading Loading @@ -46,9 +44,17 @@ class Analytics extends Cli\Controller implements Interfaces\CliControllerInterf { { error_reporting(E_ALL); error_reporting(E_ALL); ini_set('display_errors', 1); ini_set('display_errors', 1); $estimate = false; if ($this->getOpt('incremental')) { $estimate = true; $from = strtotime('midnight +1 day'); //run throughout the day, provides estimates $to = $from; } else { $from = (strtotime('midnight', $this->getOpt('from')) ?: strtotime('midnight yesterday')); $from = (strtotime('midnight', $this->getOpt('from')) ?: strtotime('midnight yesterday')); $to = (strtotime('midnight', $this->getOpt('to')) ?: strtotime('midnight yesterday')); $to = (strtotime('midnight', $this->getOpt('to')) ?: strtotime('midnight yesterday')); } $rangeOffset = getopt('rangeOffset') ?: 7; $rangeOffset = getopt('rangeOffset') ?: 7; $mode = strtolower($this->getOpt('mode')) ?: 'notify'; $mode = strtolower($this->getOpt('mode')) ?: 'notify'; $this->out('Collecting user activity'); $this->out('Collecting user activity'); Loading @@ -61,7 +67,7 @@ class Analytics extends Cli\Controller implements Interfaces\CliControllerInterf ->sync(); ->sync(); if ($mode === 'notify') { if ($mode === 'notify') { $this->out('Sending notifications'); $this->out('Sending notifications'); $manager->emitStateChanges(); $manager->emitStateChanges($estimate); } } $from = strtotime('+1 day', $from); $from = strtotime('+1 day', $from); } } Loading Core/Analytics/Delegates/UpdateUserStateEstimate.php 0 → 100644 +69 −0 Original line number Original line Diff line number Diff line <?php namespace Minds\Core\Analytics\Delegates; use Minds\Core\Analytics\UserStates\UserState; use Minds\Core\Events\Dispatcher; use Minds\Core\Notification\Notification; use Minds\Entities\User; class UpdateUserStateEstimate { /** @var User */ private $user; /** @var UserState */ private $userState; /** @var int */ private $estimateStateChange; public function __construct(UserState $userState, User $user = null) { $this->userState = $userState; $this->user = $user ?? new User($userState->getUserGuid()); $this->estimateStateChange = 0; } public function update(): void { $this->deriveEstimateChange(); $this->updateUserEntity(); $this->sendStateChangeNotification(); } private function deriveEstimateChange(): void { $this->estimateStateChange = UserState::stateChange($this->user->getUserStateToday(), $this->userState->getState()); } private function updateUserEntity(): void { $this->user->setUserStateToday($this->userState->getState()) ->setUserStateTodayUpdatedMs($this->userState->getReferenceDateMs()) ->save(); } private function sendStateChangeNotification(): void { $data = [ $this->user->getGUID(), $this->user->getUserState(), $this->user->getUserStateToday(), $this->userState->getStateChange(), $this->estimateStateChange ]; error_log(implode('|', $data)); if ($this->estimateStateChange < 0 && $this->userState->getStateChange() < 0) { $notificationView = 'rewards_state_decrease_today'; Dispatcher::trigger('notification', 'reward', [ 'to' => [ $this->userState->getUserGuid() ], 'from' => Notification::SYSTEM_ENTITY, 'notification_view' => $notificationView, 'params' => $this->userState->export() ]); } } } Core/Analytics/Events.php +5 −0 Original line number Original line Diff line number Diff line Loading @@ -21,5 +21,10 @@ class Events $userState = Core\Analytics\UserStates\UserState::fromArray($event->getParameters()); $userState = Core\Analytics\UserStates\UserState::fromArray($event->getParameters()); (new Core\Analytics\Delegates\UpdateUserState($userState))->update(); (new Core\Analytics\Delegates\UpdateUserState($userState))->update(); }); }); $this->eventsDispatcher->register('user_state_change_estimate', 'all', function (Core\Events\Event $event) { $userState = Core\Analytics\UserStates\UserState::fromArray($event->getParameters()); (new Core\Analytics\Delegates\UpdateUserStateEstimate($userState))->update(); }); } } } } Core/Analytics/UserStates/Manager.php +15 −4 Original line number Original line Diff line number Diff line Loading @@ -2,6 +2,7 @@ namespace Minds\Core\Analytics\UserStates; namespace Minds\Core\Analytics\UserStates; use Minds\Core\Data\ElasticSearch\Client; use Minds\Core\Di\Di; use Minds\Core\Di\Di; use Minds\Core\Queue; use Minds\Core\Queue; Loading @@ -28,6 +29,9 @@ class Manager /** @var array $pendingBulkInserts * */ /** @var array $pendingBulkInserts * */ private $pendingBulkInserts = []; private $pendingBulkInserts = []; /** @var Client */ private $es; public function __construct($client = null, $index = null, $queue = null, $activeUsersIterator = null, $userStateIterator = null) public function __construct($client = null, $index = null, $queue = null, $activeUsersIterator = null, $userStateIterator = null) { { $this->es = $client ?: Di::_()->get('Database\ElasticSearch'); $this->es = $client ?: Di::_()->get('Database\ElasticSearch'); Loading Loading @@ -67,7 +71,7 @@ class Manager $this->bulk(); $this->bulk(); } } public function emitStateChanges() public function emitStateChanges(bool $estimate = false) { { $this->userStateIterator->setReferenceDate($this->referenceDate); $this->userStateIterator->setReferenceDate($this->referenceDate); Loading @@ -75,9 +79,16 @@ class Manager foreach ($this->userStateIterator as $userState) { foreach ($this->userStateIterator as $userState) { //Reindex with previous state //Reindex with previous state $this->index($userState); $this->index($userState); $this->queue->send([ 'user_state_change' => $userState->export(), $payload = [ ]); 'user_state_change' => $userState->export() ]; if ($estimate) { $payload['estimate'] = true; } $this->queue->send($payload); } } $this->bulk(); $this->bulk(); } } Loading Core/Queue/Runners/UserStateChange.php +3 −2 Original line number Original line Diff line number Diff line Loading @@ -15,9 +15,10 @@ class UserStateChange implements Interfaces\QueueRunner { { $client = Queue\Client::Build(); $client = Queue\Client::Build(); $client->setQueue('UserStateChanges') $client->setQueue('UserStateChanges') ->receive(function ($data) use ($mailer) { ->receive(function ($data) { $data = $data->getData(); $data = $data->getData(); $result = Dispatcher::trigger('user_state_change', $data['user_state_change']['state'], $data['user_state_change']); $event = isset($data['estimate']) ? 'user_state_change_estimate' : 'user_state_change'; Dispatcher::trigger($event, $data['user_state_change']['state'], $data['user_state_change']); }); }); } } } } Loading
Controllers/Cli/Analytics.php +12 −6 Original line number Original line Diff line number Diff line Loading @@ -10,14 +10,12 @@ use Minds\Interfaces; class Analytics extends Cli\Controller implements Interfaces\CliControllerInterface class Analytics extends Cli\Controller implements Interfaces\CliControllerInterface { { private $start; private $elasticsearch; public function help($command = null) public function help($command = null) { { switch ($command) { switch ($command) { case 'sync_activeUsers': case 'sync_activeUsers': $this->out('Indexes user activity by guid and counts per day'); $this->out('Indexes user activity by guid and counts per day'); $this->out('--incremental sync current day for estimates of state change (overrides from and to params)'); $this->out('--from={timestamp} the day to start counting. Default is yesterday at midnight'); $this->out('--from={timestamp} the day to start counting. Default is yesterday at midnight'); $this->out('--to={timestamp} the day to stop counting. Default is yesterday at midnight'); $this->out('--to={timestamp} the day to stop counting. Default is yesterday at midnight'); $this->out('--rangeOffset={number of days} the number of days to look back into the past. Default is 7'); $this->out('--rangeOffset={number of days} the number of days to look back into the past. Default is 7'); Loading Loading @@ -46,9 +44,17 @@ class Analytics extends Cli\Controller implements Interfaces\CliControllerInterf { { error_reporting(E_ALL); error_reporting(E_ALL); ini_set('display_errors', 1); ini_set('display_errors', 1); $estimate = false; if ($this->getOpt('incremental')) { $estimate = true; $from = strtotime('midnight +1 day'); //run throughout the day, provides estimates $to = $from; } else { $from = (strtotime('midnight', $this->getOpt('from')) ?: strtotime('midnight yesterday')); $from = (strtotime('midnight', $this->getOpt('from')) ?: strtotime('midnight yesterday')); $to = (strtotime('midnight', $this->getOpt('to')) ?: strtotime('midnight yesterday')); $to = (strtotime('midnight', $this->getOpt('to')) ?: strtotime('midnight yesterday')); } $rangeOffset = getopt('rangeOffset') ?: 7; $rangeOffset = getopt('rangeOffset') ?: 7; $mode = strtolower($this->getOpt('mode')) ?: 'notify'; $mode = strtolower($this->getOpt('mode')) ?: 'notify'; $this->out('Collecting user activity'); $this->out('Collecting user activity'); Loading @@ -61,7 +67,7 @@ class Analytics extends Cli\Controller implements Interfaces\CliControllerInterf ->sync(); ->sync(); if ($mode === 'notify') { if ($mode === 'notify') { $this->out('Sending notifications'); $this->out('Sending notifications'); $manager->emitStateChanges(); $manager->emitStateChanges($estimate); } } $from = strtotime('+1 day', $from); $from = strtotime('+1 day', $from); } } Loading
Core/Analytics/Delegates/UpdateUserStateEstimate.php 0 → 100644 +69 −0 Original line number Original line Diff line number Diff line <?php namespace Minds\Core\Analytics\Delegates; use Minds\Core\Analytics\UserStates\UserState; use Minds\Core\Events\Dispatcher; use Minds\Core\Notification\Notification; use Minds\Entities\User; class UpdateUserStateEstimate { /** @var User */ private $user; /** @var UserState */ private $userState; /** @var int */ private $estimateStateChange; public function __construct(UserState $userState, User $user = null) { $this->userState = $userState; $this->user = $user ?? new User($userState->getUserGuid()); $this->estimateStateChange = 0; } public function update(): void { $this->deriveEstimateChange(); $this->updateUserEntity(); $this->sendStateChangeNotification(); } private function deriveEstimateChange(): void { $this->estimateStateChange = UserState::stateChange($this->user->getUserStateToday(), $this->userState->getState()); } private function updateUserEntity(): void { $this->user->setUserStateToday($this->userState->getState()) ->setUserStateTodayUpdatedMs($this->userState->getReferenceDateMs()) ->save(); } private function sendStateChangeNotification(): void { $data = [ $this->user->getGUID(), $this->user->getUserState(), $this->user->getUserStateToday(), $this->userState->getStateChange(), $this->estimateStateChange ]; error_log(implode('|', $data)); if ($this->estimateStateChange < 0 && $this->userState->getStateChange() < 0) { $notificationView = 'rewards_state_decrease_today'; Dispatcher::trigger('notification', 'reward', [ 'to' => [ $this->userState->getUserGuid() ], 'from' => Notification::SYSTEM_ENTITY, 'notification_view' => $notificationView, 'params' => $this->userState->export() ]); } } }
Core/Analytics/Events.php +5 −0 Original line number Original line Diff line number Diff line Loading @@ -21,5 +21,10 @@ class Events $userState = Core\Analytics\UserStates\UserState::fromArray($event->getParameters()); $userState = Core\Analytics\UserStates\UserState::fromArray($event->getParameters()); (new Core\Analytics\Delegates\UpdateUserState($userState))->update(); (new Core\Analytics\Delegates\UpdateUserState($userState))->update(); }); }); $this->eventsDispatcher->register('user_state_change_estimate', 'all', function (Core\Events\Event $event) { $userState = Core\Analytics\UserStates\UserState::fromArray($event->getParameters()); (new Core\Analytics\Delegates\UpdateUserStateEstimate($userState))->update(); }); } } } }
Core/Analytics/UserStates/Manager.php +15 −4 Original line number Original line Diff line number Diff line Loading @@ -2,6 +2,7 @@ namespace Minds\Core\Analytics\UserStates; namespace Minds\Core\Analytics\UserStates; use Minds\Core\Data\ElasticSearch\Client; use Minds\Core\Di\Di; use Minds\Core\Di\Di; use Minds\Core\Queue; use Minds\Core\Queue; Loading @@ -28,6 +29,9 @@ class Manager /** @var array $pendingBulkInserts * */ /** @var array $pendingBulkInserts * */ private $pendingBulkInserts = []; private $pendingBulkInserts = []; /** @var Client */ private $es; public function __construct($client = null, $index = null, $queue = null, $activeUsersIterator = null, $userStateIterator = null) public function __construct($client = null, $index = null, $queue = null, $activeUsersIterator = null, $userStateIterator = null) { { $this->es = $client ?: Di::_()->get('Database\ElasticSearch'); $this->es = $client ?: Di::_()->get('Database\ElasticSearch'); Loading Loading @@ -67,7 +71,7 @@ class Manager $this->bulk(); $this->bulk(); } } public function emitStateChanges() public function emitStateChanges(bool $estimate = false) { { $this->userStateIterator->setReferenceDate($this->referenceDate); $this->userStateIterator->setReferenceDate($this->referenceDate); Loading @@ -75,9 +79,16 @@ class Manager foreach ($this->userStateIterator as $userState) { foreach ($this->userStateIterator as $userState) { //Reindex with previous state //Reindex with previous state $this->index($userState); $this->index($userState); $this->queue->send([ 'user_state_change' => $userState->export(), $payload = [ ]); 'user_state_change' => $userState->export() ]; if ($estimate) { $payload['estimate'] = true; } $this->queue->send($payload); } } $this->bulk(); $this->bulk(); } } Loading
Core/Queue/Runners/UserStateChange.php +3 −2 Original line number Original line Diff line number Diff line Loading @@ -15,9 +15,10 @@ class UserStateChange implements Interfaces\QueueRunner { { $client = Queue\Client::Build(); $client = Queue\Client::Build(); $client->setQueue('UserStateChanges') $client->setQueue('UserStateChanges') ->receive(function ($data) use ($mailer) { ->receive(function ($data) { $data = $data->getData(); $data = $data->getData(); $result = Dispatcher::trigger('user_state_change', $data['user_state_change']['state'], $data['user_state_change']); $event = isset($data['estimate']) ? 'user_state_change_estimate' : 'user_state_change'; Dispatcher::trigger($event, $data['user_state_change']['state'], $data['user_state_change']); }); }); } } } }