Skip to content
Commits on Source (2)
<?php
/**
* Cassandra Repository.
*/
namespace Minds\Core\Notification;
use Minds\Core\Di\Di;
use Minds\Common\Repository\Response;
use Cassandra\Bigint;
use Cassandra\Timeuuid;
use Cassandra\Timestamp;
use Minds\Common\Urn;
use Minds\Core\Data\Cassandra\Prepared;
class CassandraRepository
{
const NOTIFICATION_TTL = ((60 * 60) * 24) * 30; // 30 days
/** @var $cql */
private $cql;
public function __construct($cql = null, $urn = null)
{
$this->cql = $cql ?: Di::_()->get('Database\Cassandra\Cql');
$this->urn = $urn ?: new Urn;
}
/**
* Get a list of notifications.
*/
public function getList($opts)
{
$opts = array_merge([
'to_guid' => null,
'type_group' => null,
'uuid' => null,
'offset' => null,
'limit' => 12,
], $opts);
if (!$opts['to_guid']) {
throw new \Exception('to_guid must be provided');
}
$statement = "SELECT * FROM notifications
WHERE to_guid = ?";
$values = [
new Bigint($opts['to_guid']),
];
if ($opts['uuid']) {
$statement .= " AND uuid = ?";
$values[] = new Timeuuid($opts['uuid']);
}
if ($opts['type_group']) {
$statement = "SELECT * FROM notifications_by_type_group
WHERE to_guid = ?
AND type_group = ?";
$values[] = $opts['type_group'];
}
$query = new Prepared\Custom();
$query->query($statement, $values);
$query->setOpts([
'page_size' => $opts['limit'],
'paging_state_token' => base64_decode($opts['offset']),
]);
try {
$result = $this->cql->request($query);
} catch (\Exception $e) {
return false;
}
$response = new Response();
foreach ($result as $row) {
$notification = new Notification();
$notification->setUuid($row['uuid']->uuid() ?: null)
->setToGuid($row['to_guid'] ? (int) $row['to_guid']->value(): null)
->setFromGuid($row['from_guid'] ? (int) $row['from_guid']->value(): null)
->setEntityGuid((string) $row['entity_guid']) // REMOVE ONCE FULLY ON CASSANDRA
->setEntityUrn($row['entity_urn'])
->setCreatedTimestamp($row['created_timestamp'] ? $row['created_timestamp']->time() : null)
->setReadTimestamp($row['read_timestamp'] ? $row['read_timestamp']->time() : null)
->setType($row['type'])
->setData(json_decode($row['data'], true));
$response[] = $notification;
}
$response->setPagingToken(base64_encode($result->pagingStateToken()));
return $response;
}
/**
* Get a single notification.
* @param $urn
* @return Notification
*/
public function get($urn)
{
list ($to_guid, $uuid) = explode('-', $this->urn->setUrn($urn)->getNss());
$response = $this->getList([
'to_guid' => $to_guid,
'uuid' => $uuid,
'limit' => 1,
]);
return $response[0];
}
/**
* Add a notification to the database.
*
* @param Notification $notification
*
* @return Notification|bool
*/
public function add($notification)
{
if (!$notification->getUuid()) {
$notification->setUuid((new Timeuuid($notification->getUuid() ?? time() * 1000))->uuid());
}
$statement = 'INSERT INTO notifications (
to_guid,
uuid,
type,
type_group,
from_guid,
entity_guid,
entity_urn,
created_timestamp,
read_timestamp,
data
) VALUES (?,?,?,?,?,?,?,?,?, ?)
USING TTL ?';
$values = [
new Bigint($notification->getToGuid()),
new Timeuuid($notification->getUuid()),
(string) $notification->getType(),
Manager::getGroupFromType($notification->getType()),
new Bigint($notification->getFromGuid()),
(string) $notification->getEntityGuid(), // REMOVE ONCE FULLY ON CASSANDRA
(string) $notification->getEntityUrn(),
new Timestamp($notification->getCreatedTimestamp() ?: time()),
$notification->getReadTimestamp() ? new Timestamp($notification->getReadTimestamp()) : null,
json_encode($notification->getData()),
static::NOTIFICATION_TTL,
];
$query = new Prepared\Custom();
$query->query($statement, $values);
try {
$success = $this->cql->request($query);
} catch (\Exception $e) {
return false;
}
return $success;
}
// TODO
public function update($notification, $fields)
{
}
// TODO
public function delete($uuid)
{
}
}
......@@ -75,6 +75,7 @@ class Events
->setToGuid($params['to'])
->setFromGuid($from_user->getGuid())
->setEntityGuid($entityGuid)
->setEntityUrn($entity->getUrn())
->setType($params['notification_view'])
->setData($data);
......
......@@ -9,6 +9,7 @@ use Minds\Common\Repository\Response;
use Minds\Core\Config;
use Minds\Core\Di\Di;
use Minds\Entities\User;
use Minds\Core\Features\Manager as FeaturesManager;
class Manager
{
......@@ -19,17 +20,26 @@ class Manager
/** @var Repository $repository */
private $repository;
/** @var LegacyRepository $legacyRepository */
private $legacyRepository;
/** @var CassandraRepository $cassandraRepository */
private $cassandraRepository;
/** @var FeaturesManager $features */
private $features;
/** @var User $user */
private $user;
public function __construct($config = null, $repository = null, $legacyRepository = null)
public function __construct(
$config = null,
$repository = null,
$cassandraRepository = null,
$features = null
)
{
$this->config = $config ?: Di::_()->get('Config');
$this->repository = $repository ?: new Repository;
$this->legacyRepository = $legacyRepository ?: new LegacyRepository;
$this->cassandraRepository = $cassandraRepository ?: new CassandraRepository;
$this->features = $features ?: new FeaturesManager;
}
/**
......@@ -45,12 +55,15 @@ class Manager
/**
* Return a single notification
* @param $uuid
* @param $urn
* @return Notification
*/
public function getSingle($uuid)
public function getSingle($urn)
{
return $this->repository->get($uuid);
if (strpos($urn, 'urn:') !== FALSE) {
return $this->cassandraRepository->get($urn);
}
return $this->repository->get($urn);
}
/**
......@@ -68,6 +81,8 @@ class Manager
'offset' => '',
], $opts);
$opts['type_group'] = $opts['type'];
switch ($opts['type']) {
case "tags":
$opts['types'] = [
......@@ -123,6 +138,10 @@ class Manager
break;
}
if ($this->features->has('cassandra-notifications')) {
return $this->cassandraRepository->getList($opts);
}
return $this->repository->getList($opts);
}
......@@ -134,6 +153,7 @@ class Manager
public function add($notification)
{
try {
$this->cassandraRepository->add($notification);
$uuid = $this->repository->add($notification);
return $uuid;
......@@ -145,4 +165,53 @@ class Manager
}
}
/**
* @param $type
* @return string
*/
public static function getGroupFromType($type)
{
switch ($type) {
case 'tag':
return 'tags';
break;
case 'friends':
case 'welcome_chat':
case 'welcome_discover':
return 'subscriptions';
break;
case 'group_invite':
case 'group_kick':
case 'group_activity':
return 'groups';
break;
case 'comment':
return 'comments';
break;
case 'like':
case 'downvote':
return 'votes';
break;
case 'remind':
return 'reminds';
break;
case 'boost_gift':
case 'boost_submitted':
case 'boost_submitted_p2p':
case 'boost_request':
case 'boost_rejected':
case 'boost_revoked':
case 'boost_accepted':
case 'boost_completed':
case 'boost_peer_request':
case 'boost_peer_accepted':
case 'boost_peer_rejected':
case 'welcome_points':
case 'welcome_boost':
return 'boosts';
break;
}
return 'unknown';
}
}
......@@ -9,14 +9,16 @@ use Minds\Traits\MagicAttributes;
/**
* Class Notification
* @package Minds\Core\Notification
* @method string getUUID()
* @method Notification setUUID(string $value)
* @method string getUuid()
* @method Notification setUuid(string $value)
* @method string getToGuid()
* @method Notification setToGuid(string $value)
* @method string getFromGuid()
* @method Notification setFromGuid(string $value)
* @method string getEntityGuid()
* @method Notification setEntityGuid(string $value)
* @method string getEntityUrn()
* @method Notification getEntityUrn(string $value)
* @method string getType()
* @method Notification setType(string $value)
* @method array getData()
......@@ -44,6 +46,9 @@ class Notification
/** @param string $entityGuid */
private $entityGuid;
/** @param string $entityUrn */
private $entityUrn;
/** @param string $type */
private $type;
......@@ -59,6 +64,15 @@ class Notification
/** @var int $readTimestamp */
private $readTimestamp;
/**
* Return the UUID of the notification
* @return string
*/
public function getUrn()
{
return "urn:notification:" . implode('-', [ $this->toGuid, $this->uuid ]);
}
/**
* Export
* @return array
......@@ -66,9 +80,13 @@ class Notification
public function export()
{
return [
'uuid' => $this->getUuid(),
'toGuid' => $this->getToGuid(),
'to_guid' => $this->getToGuid(),
'fromGuid' => $this->getFromGuid(),
'from_guid' => $this->getFromGuid(),
'entityGuid' => $this->getEntityGuid(),
'entity_urn' => $this->getEntityUrn(),
'type' => $this->getType(),
'data' => $this->getData(),
];
......
......@@ -151,6 +151,7 @@ class Repository
}
$query = 'INSERT INTO notifications (
uuid,
to_guid,
from_guid,
entity_guid,
......@@ -162,6 +163,7 @@ class Repository
$values = [];
foreach ($notifications as $notification) {
$values = array_merge($values, [
$notification->getUuid(),
$notification->getToGuid(),
$notification->getFromGuid(),
$notification->getEntityGuid(),
......@@ -171,7 +173,7 @@ class Repository
]);
}
$query .= implode(',', array_fill(0, count($notifications), '(?,?,?,?,?,?)'));
$query .= implode(',', array_fill(0, count($notifications), '(?,?,?,?,?,?,?)'));
$query .= ' RETURNING UUID';
......
......@@ -905,26 +905,28 @@ CREATE TABLE minds.translations (
AND speculative_retry = '99PERCENTILE';
CREATE TABLE minds.notifications (
owner_guid varint,
guid varint,
data text,
to_guid bigint,
uuid timeuuid,
type text,
PRIMARY KEY (owner_guid, guid)
) WITH CLUSTERING ORDER BY (guid DESC)
AND bloom_filter_fp_chance = 0.01
AND caching = {'keys': 'ALL', 'rows_per_partition': 'NONE'}
AND comment = ''
AND compaction = {'class': 'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy', 'max_threshold': '32', 'min_threshold': '4'}
AND compression = {'chunk_length_in_kb': '64', 'class': 'org.apache.cassandra.io.compress.LZ4Compressor'}
AND crc_check_chance = 1.0
AND dclocal_read_repair_chance = 0.1
AND default_time_to_live = 0
AND gc_grace_seconds = 864000
AND max_index_interval = 2048
AND memtable_flush_period_in_ms = 0
AND min_index_interval = 128
AND read_repair_chance = 0.0
AND speculative_retry = '99PERCENTILE';
type_group text,
from_guid bigint,
entity_guid text,
entity_urn text,
created_timestamp timestamp,
read_timestamp timestamp,
data text,
PRIMARY KEY (to_guid, uuid)
) WITH CLUSTERING ORDER BY (uuid DESC);
CREATE MATERIALIZED VIEW minds.notifications_by_type_group AS
SELECT *
FROM minds.notifications
WHERE to_guid IS NOT NULL
AND uuid IS NOT NULL
AND type_group IS NOT NULL
PRIMARY KEY (to_guid, type_group, uuid)
WITH CLUSTERING ORDER BY (type_group DESC, uuid DESC);
CREATE TABLE minds.friendsof (
key text,
......
<?php
namespace Spec\Minds\Core\Notification;
use Minds\Core\Notification\CassandraRepository;
use Minds\Core\Data\Cassandra\Client;
use Minds\Core\Notification\Notification;
use PhpSpec\ObjectBehavior;
use Prophecy\Argument;
use Spec\Minds\Mocks\Cassandra\Rows;
use Cassandra\Timeuuid;
use Cassandra\Bigint;
use Cassandra\Timestamp;
class CassandraRepositorySpec extends ObjectBehavior
{
/** @var Client */
private $cql;
function let(Client $cql)
{
$this->beConstructedWith($cql);
$this->cql = $cql;
}
function it_is_initializable()
{
$this->shouldHaveType(CassandraRepository::class);
}
function it_should_return_a_notification_from_uuid()
{
$this->cql->request(Argument::that(function($prepared) {
$values = $prepared->build()['values'];
return $values[0]->value() == 123
&& $values[1]->uuid() == 'uuid';
}))
->willReturn(new Rows([
[
'uuid' => new Timeuuid(time()),
'to_guid' => new Bigint(123),
'from_guid' => new Bigint(456),
'entity_guid' => '789',
'entity_urn' => 'urn:entity:789',
'created_timestamp' => new Timestamp(time()),
'read_timestamp' => null,
'type' => 'like',
'data' => '',
]
], ''));
$notification = $this->get('urn:notification:123-uuid');
$notification->getToGuid()
->shouldBe(123);
$notification->getFromGuid()
->shouldBe(456);
$notification->getEntityGuid()
->shouldBe('789');
$notification->getEntityUrn()
->shouldBe('urn:entity:789');
$notification->getCreatedTimestamp()
->shouldBe(time());
$notification->getType()
->shouldBe('like');
}
function it_should_add_to_database(Notification $notification)
{
$this->cql->request(Argument::that(function($prepared) {
$values = $prepared->build()['values'];
return $values[0]->value() == 123;
}))
->shouldBeCalled()
->willReturn(true);
$notification->getToGuid()
->willReturn(123);
$notification->getUuid()
->willReturn(null);
$notification->setUuid(Argument::type('string'))
->shouldBeCalled();
$notification->getFromGuid()
->willReturn(456);
$notification->getType()
->willReturn('boost');
$notification->getEntityGuid()
->willReturn(789);
$notification->getEntityUrn()
->willReturn('urn:entity:789');
$notification->getCreatedTimestamp()
->willReturn(time());
$notification->getReadTimestamp()
->willReturn(null);
$notification->getData()
->willReturn(null);
$this->add($notification)
->shouldReturn(true);
}
function it_should_load_notification_for_user()
{
$this->cql->request(Argument::that(function($prepared) {
$statement = preg_replace('/\s+/', ' ', $prepared->build()['string']);
$values = $prepared->build()['values'];
return strpos($statement, 'SELECT * FROM notifications WHERE to_guid', 0) !== FALSE
&& $values[0]->value() == 123;
}))
->willReturn(new Rows([
[
'uuid' => new Timeuuid(time()),
'to_guid' => new Bigint(123),
'from_guid' => new Bigint(456),
'entity_guid' => '789',
'entity_urn' => 'urn:entity:789',
'created_timestamp' => new Timestamp(time()),
'read_timestamp' => null,
'type' => 'like',
'data' => '',
]
], ''));
$notifications = $this->getList([
'limit' => 24,
'to_guid' => 123,
]);
$notifications->shouldHaveCount(1);
}
function it_should_load_notification_from_type_group()
{
$this->cql->request(Argument::that(function($prepared) {
$statement = $prepared->build()['string'];
$values = $prepared->build()['values'];
return strpos($statement, 'SELECT * FROM notifications_by_type_group', 0) !== FALSE
&& $values[0]->value() == 123
&& $values[1] == 'votes';
}))
->willReturn(new Rows([
[
'uuid' => new Timeuuid(time()),
'to_guid' => new Bigint(123),
'from_guid' => new Bigint(456),
'entity_guid' => '789',
'entity_urn' => 'urn:entity:789',
'created_timestamp' => new Timestamp(time()),
'read_timestamp' => null,
'type' => 'like',
'data' => '',
]
], ''));
$notifications = $this->getList([
'limit' => 24,
'to_guid' => 123,
'type_group' => 'votes',
]);
$notifications->shouldHaveCount(1);
}
}
......@@ -7,7 +7,10 @@ use Minds\Core\Notification\LegacyRepository;
use Minds\Core\Notification\Manager;
use Minds\Core\Notification\Notification;
use Minds\Core\Notification\Repository;
use Minds\Core\Notification\CassandraRepository;
use Minds\Core\Features\Manager as FeaturesManager;
use PhpSpec\ObjectBehavior;
use Prophecy\Argument;
class ManagerSpec extends ObjectBehavior
{
......@@ -17,16 +20,25 @@ class ManagerSpec extends ObjectBehavior
/** @var Repository */
private $repository;
/** @var LegacyRepository */
private $legacyRepository;
/** @var CassandraRepository */
private $cassandraRepository;
function let(Config $config, Repository $repository, LegacyRepository $legacyRepository)
/** @var FeaturesManager */
private $features;
function let(
Config $config,
Repository $repository,
CassandraRepository $cassandraRepository,
FeaturesManager $features
)
{
$this->config = $config;
$this->repository = $repository;
$this->legacyRepository = $legacyRepository;
$this->cassandraRepository = $cassandraRepository;
$this->features = $features;
$this->beConstructedWith($config, $repository, $legacyRepository);
$this->beConstructedWith($config, $repository, $cassandraRepository, $features);
}
function it_is_initializable()
......@@ -42,4 +54,39 @@ class ManagerSpec extends ObjectBehavior
$this->getSingle('1234')->shouldReturn($notification);
}
function it_should_get_from_cassandra_if_urn_provided(Notification $notification)
{
$this->cassandraRepository->get('urn:notification:1234')
->shouldBeCalled()
->willReturn($notification);
$this->getSingle('urn:notification:1234')->shouldReturn($notification);
}
function it_should_get_list_from_cassandra_if_feature_enabled(Notification $notification)
{
$this->features->has('cassandra-notifications')
->willReturn(true);
$this->cassandraRepository->getList(Argument::that(function($opts) {
return $opts['limit'] === 6;
}))
->shouldBeCalled()
->willReturn([ $notification ]);
$response = $this->getList([ 'limit' => 6 ]);
$response[0]->shouldBe($notification);
}
function it_should_add_to_both_repositories(Notification $notification)
{
$this->repository->add($notification)
->shouldBeCalled();
$this->cassandraRepository->add($notification)
->shouldBeCalled();
$this->add($notification);
}
}