Commit 9513366e authored by Giorgio Azzinnaro's avatar Giorgio Azzinnaro

adding log

parent ab2bf57c
......@@ -48,7 +48,10 @@ public:
bool Put(const Message & message)
{
this->storage->Store(this->marshaller->Marshal(message));
const protobuf::MessageTreeNode & messageTree = this->marshaller->Marshal(message);
// TODO Iterate over MessageTreeNode and store all
this->storage->Store(messageTree.message());
// TODO Check exceptions
return true;
......
find_package(Boost REQUIRED COMPONENTS filesystem) # Boost Filesystem scans the .proto directory
add_definitions(-DBOOST_LOG_DYN_LINK) # This is for the Boost Log library to load the proper target namespace for the current platform
find_package(Boost REQUIRED COMPONENTS log filesystem) # Boost Filesystem scans the .proto directory
add_library(profanedb_format STATIC protobuf/marshaller.cpp protobuf/loader.cpp)
target_link_libraries(profanedb_format profanedb_protobuf ${Boost_LIBRARIES})
......
......@@ -32,13 +32,21 @@ using google::protobuf::Descriptor;
using google::protobuf::DescriptorProto;
using google::protobuf::FieldDescriptor;
using google::protobuf::FieldDescriptorProto_Type;
using google::protobuf::DescriptorPool;
using profanedb::protobuf::Key;
profanedb::format::protobuf::Loader::RootSourceTree::RootSourceTree(std::initializer_list<path> paths)
: paths(paths)
{
for (const auto & path: paths)
if (paths.size() == 0)
throw std::runtime_error("Mapping is empty");
for (const auto & path: paths) {
this->MapPath("", path.string());
BOOST_LOG_TRIVIAL(debug) << "Mapping " << path.string();
}
ZeroCopyInputStream * inputStream = this->Open("");
if (inputStream == nullptr)
......@@ -62,14 +70,22 @@ profanedb::format::protobuf::Loader::Loader(
// Just in case schema is defined in more than one place
for (const auto & path: schemaSourceTree->paths) {
BOOST_LOG_TRIVIAL(debug) << "Loading schema at " << path.string();
// Iterate through all files in that mapped path
for (const auto & file: recursive_directory_iterator(path, symlink_option::recurse)) {
// Only consider files ending in .proto
// TODO This might be configured differently
if (file.path().extension() == ".proto") {
// The file is now retrieved, and its path for Protobuf must be relative to the mapping
this->ParseFile(schemaPool.FindFileByName(file.path().lexically_relative(path).string()));
// it's parsed, normalized (nested keyable messages are removed)
FileDescriptorProto normalizedProto = this->ParseFile(schemaPool.FindFileByName(file.path().lexically_relative(path).string()));
BOOST_LOG_TRIVIAL(debug) << "Adding normalized proto " << normalizedProto.name();
// The normalizedDescriptorDb keeps these new Descriptors
this->normalizedDescriptorDb.AddAndOwn(&normalizedProto);
}
}
}
......@@ -78,13 +94,16 @@ profanedb::format::protobuf::Loader::Loader(
FileDescriptorProto profanedb::format::protobuf::Loader::ParseFile(
const FileDescriptor * fileDescriptor)
{
BOOST_LOG_TRIVIAL(debug) << "Parsing file " << fileDescriptor->name();
// BOOST_LOG_TRIVIAL(trace) << fileDescriptor->DebugString(); // Redundant, is done for each message later
// A FileDescriptorProto is needed to edit messages and populate the normalized descriptor database
FileDescriptorProto normFileDescProto;
fileDescriptor->CopyTo(&normFileDescProto);
// For each message in the file...
for (int i = 0; i < fileDescriptor->message_type_count(); i++) {
// ... parse it, make nested messages Key objects
// ... parse it, make nested messages of type profanedb.protobuf.Key
*normFileDescProto.mutable_message_type(i) = this->ParseAndNormalizeDescriptor(fileDescriptor->message_type(i));
}
......@@ -94,6 +113,9 @@ FileDescriptorProto profanedb::format::protobuf::Loader::ParseFile(
DescriptorProto profanedb::format::protobuf::Loader::ParseAndNormalizeDescriptor(
const Descriptor * descriptor)
{
BOOST_LOG_TRIVIAL(debug) << "Parsing descriptor " << descriptor->full_name();
BOOST_LOG_TRIVIAL(trace) << descriptor->DebugString();
DescriptorProto normDescProto;
descriptor->CopyTo(&normDescProto);
......@@ -117,6 +139,10 @@ DescriptorProto profanedb::format::protobuf::Loader::ParseAndNormalizeDescriptor
normDescProto.mutable_field(k)->set_type(
FieldDescriptorProto_Type::FieldDescriptorProto_Type_TYPE_MESSAGE); // Redundant, is message already
normDescProto.mutable_field(k)->set_type_name(Key::descriptor()->full_name()); // TODO Should include Key in normalizedPool
BOOST_LOG_TRIVIAL(trace) << "Message " << descriptor->name()
<< " has keyable nested message " << field->name()
<< " of type " << nestedMessage->name();
}
}
......@@ -133,13 +159,12 @@ bool profanedb::format::protobuf::Loader::IsKeyable(const Descriptor * descripto
return false;
}
const google::protobuf::DescriptorPool & profanedb::format::protobuf::Loader::GetSchemaPool() const
const DescriptorPool & profanedb::format::protobuf::Loader::GetSchemaPool() const
{
return this->schemaPool;
}
const google::protobuf::DescriptorPool & profanedb::format::protobuf::Loader::GetNormalizedPool() const
const DescriptorPool & profanedb::format::protobuf::Loader::GetNormalizedPool() const
{
return this->normalizedPool;
}
......@@ -28,6 +28,7 @@
#include <google/protobuf/compiler/importer.h>
#include <boost/filesystem.hpp>
#include <boost/log/trivial.hpp>
namespace profanedb {
namespace format {
......
......@@ -31,7 +31,11 @@ using profanedb::protobuf::MessageTreeNode;
using profanedb::protobuf::StorableMessage;
using profanedb::protobuf::Key;
profanedb::format::protobuf::Marshaller::Marshaller(
namespace profanedb {
namespace format {
namespace protobuf {
Marshaller::Marshaller(
std::shared_ptr<Storage> storage,
std::shared_ptr<Loader> loader)
: loader(loader)
......@@ -39,15 +43,14 @@ profanedb::format::protobuf::Marshaller::Marshaller(
{
}
MessageTreeNode profanedb::format::protobuf::Marshaller::Marshal(const Message & message)
MessageTreeNode Marshaller::Marshal(const Message & message)
{
MessageTreeNode messageTree;
// The normalized message will be filled with data coming from input message,
// replacing references to other objects with their keys.
// It will then be serialized and set as storable message payload in messageTree;
Message * normalizedMessage =
this->messageFactory.GetPrototype(loader->GetNormalizedPool().FindMessageTypeByName(message.GetTypeName()))->New();
Message * normalizedMessage = this->CreateMessage(NORMALIZED, message.GetTypeName());
// Only fields which are set in the message are processed
std::vector< const FieldDescriptor * > setFields;
......@@ -96,15 +99,13 @@ MessageTreeNode profanedb::format::protobuf::Marshaller::Marshal(const Message &
return messageTree;
}
const Message & profanedb::format::protobuf::Marshaller::Unmarshal(const StorableMessage & storable)
const Message & Marshaller::Unmarshal(const StorableMessage & storable)
{
// An empty normalized message is generated using the Key
Message * normalizedMessage = this->messageFactory.GetPrototype(
loader->GetNormalizedPool().FindMessageTypeByName(storable.key().message_type()))->New();
Message * normalizedMessage = this->CreateMessage(NORMALIZED, storable.key().message_type());
// The original message is also retrieved
Message * originalMessage = this->messageFactory.GetPrototype(
loader->GetSchemaPool().FindMessageTypeByName(storable.key().message_type()))->New();
Message * originalMessage = this->CreateMessage(SCHEMA, storable.key().message_type());
// StorableMessage payload contains the serialized normalized message,
// as previously stored into the DB
......@@ -146,7 +147,19 @@ const Message & profanedb::format::protobuf::Marshaller::Unmarshal(const Storabl
return *originalMessage;
}
void profanedb::format::protobuf::Marshaller::CopyField(
Message * Marshaller::CreateMessage(Marshaller::MessagePool pool, std::string type)
{
// DescriptorPool is either from Schema or Normalized
const DescriptorPool & descriptorPool =
(pool == SCHEMA)
? loader->GetSchemaPool()
: loader->GetNormalizedPool();
// TODO Check whether type exists
return this->messageFactory.GetPrototype(descriptorPool.FindMessageTypeByName(type))->New();
}
void Marshaller::CopyField(
const FieldDescriptor * fromField,
const Message & from,
Message * to)
......@@ -208,7 +221,7 @@ void profanedb::format::protobuf::Marshaller::CopyField(
}
}
Key profanedb::format::protobuf::Marshaller::FieldToKey(
Key Marshaller::FieldToKey(
const Message & message,
const FieldDescriptor * fd)
{
......@@ -276,3 +289,7 @@ Key profanedb::format::protobuf::Marshaller::FieldToKey(
return key;
}
}
}
}
......@@ -42,12 +42,17 @@ class Marshaller : public profanedb::format::Marshaller<google::protobuf::Messag
public:
Marshaller(
std::shared_ptr<profanedb::vault::Storage> storage,
std::shared_ptr<Loader> loader
);
std::shared_ptr<Loader> loader);
virtual profanedb::protobuf::MessageTreeNode Marshal(const google::protobuf::Message & message) override;
virtual const google::protobuf::Message & Unmarshal(const profanedb::protobuf::StorableMessage & storable) override;
enum MessagePool {
SCHEMA,
NORMALIZED
};
google::protobuf::Message * CreateMessage(MessagePool pool, std::string type);
private:
// Loader contains the schemaPool and normalizedPool
const std::shared_ptr<Loader> loader;
......
......@@ -25,15 +25,22 @@ using ProtobufMarshaller = profanedb::format::protobuf::Marshaller;
using profanedb::vault::Storage;
using RocksStorage = profanedb::vault::rocksdb::Storage;
using namespace profanedb::protobuf;
using google::protobuf::Message;
using grpc::ServerBuilder;
using grpc::ServerContext;
using grpc::Status;
namespace profanedb {
namespace server {
profanedb::server::Server::Server()
Server::Server()
{
// TODO Config
boost::log::core::get()->set_filter(
boost::log::trivial::severity >= boost::log::trivial::debug
boost::log::trivial::severity >= boost::log::trivial::trace
);
// TODO Config
......@@ -58,16 +65,15 @@ profanedb::server::Server::Server()
auto marshaller = std::make_shared<ProtobufMarshaller>(storage, loader);
service = std::make_unique<DbServiceImpl>(
std::make_unique< profanedb::Db<Message> >(storage, marshaller));
service = std::make_unique<DbServiceImpl>(storage, marshaller);
}
profanedb::server::Server::~Server()
Server::~Server()
{
server->Shutdown();
}
void profanedb::server::Server::Run()
void Server::Run()
{
std::string address("0.0.0.0:50051");
......@@ -83,38 +89,53 @@ void profanedb::server::Server::Run()
HandleRpcs();
}
void profanedb::server::Server::HandleRpcs()
void Server::HandleRpcs()
{
server->Wait();
}
profanedb::server::Server::DbServiceImpl::DbServiceImpl(std::unique_ptr< profanedb::Db<Message> > profane)
: profane(std::move(profane))
Server::DbServiceImpl::DbServiceImpl(
std::shared_ptr<RocksStorage> storage,
std::shared_ptr<ProtobufMarshaller> marshaller)
: rocksdbStorage(storage)
, protobufMarshaller(marshaller)
, profane(std::make_unique< profanedb::Db<Message> >(storage, marshaller))
{
}
grpc::Status profanedb::server::Server::DbServiceImpl::Get(grpc::ServerContext * context, const profanedb::protobuf::GetReq * request, profanedb::protobuf::GetResp * response)
Status Server::DbServiceImpl::Get(ServerContext * context, const GetReq * request, GetResp * response)
{
BOOST_LOG_TRIVIAL(debug) << "GET request from " << context->peer();
response->mutable_message()->PackFrom(this->profane->Get(request->key()));
return grpc::Status::OK;
return Status::OK;
}
grpc::Status profanedb::server::Server::DbServiceImpl::Put(grpc::ServerContext * context, const profanedb::protobuf::PutReq * request, profanedb::protobuf::PutResp * response)
Status Server::DbServiceImpl::Put(ServerContext * context, const PutReq * request, PutResp * response)
{
BOOST_LOG_TRIVIAL(debug) << "PUT request from " << context->peer();
// TODO Unpack
// this->profanedb.Put(request->serializable());
// Because the incoming request brings a google::protobuf::Any message,
// we must dynamically create the actual message according to its type
// (which comes with `type.googleapis.com/` prepended)
std::string type = request->serializable().type_url();
Message * unpackedMessage =
this->protobufMarshaller->CreateMessage(ProtobufMarshaller::SCHEMA, type.substr(type.rfind('/')+1, std::string::npos));
request->serializable().UnpackTo(unpackedMessage);
this->profane->Put(*unpackedMessage);
return grpc::Status::OK;
return Status::OK;
}
grpc::Status profanedb::server::Server::DbServiceImpl::Delete(grpc::ServerContext * context, const profanedb::protobuf::DelReq * request, profanedb::protobuf::DelResp * response)
Status Server::DbServiceImpl::Delete(ServerContext * context, const DelReq * request, DelResp * response)
{
BOOST_LOG_TRIVIAL(debug) << "DELETE request from " << context->peer();
return grpc::Status::OK;
return Status::OK;
}
}
}
......@@ -55,15 +55,29 @@ private:
class DbServiceImpl : public profanedb::protobuf::Db::Service {
public:
DbServiceImpl(std::unique_ptr< profanedb::Db<google::protobuf::Message> > profane);
DbServiceImpl(
std::shared_ptr<profanedb::vault::rocksdb::Storage> storage,
std::shared_ptr<profanedb::format::protobuf::Marshaller> marshaller);
grpc::Status Get(grpc::ServerContext * context, const profanedb::protobuf::GetReq * request, profanedb::protobuf::GetResp* response) override;
grpc::Status Get(
grpc::ServerContext * context,
const profanedb::protobuf::GetReq * request,
profanedb::protobuf::GetResp* response) override;
grpc::Status Put(grpc::ServerContext * context, const profanedb::protobuf::PutReq * request, profanedb::protobuf::PutResp * response) override;
grpc::Status Put(
grpc::ServerContext * context,
const profanedb::protobuf::PutReq * request,
profanedb::protobuf::PutResp * response) override;
grpc::Status Delete(grpc::ServerContext * context, const profanedb::protobuf::DelReq * request, profanedb::protobuf::DelResp * response) override;
grpc::Status Delete(
grpc::ServerContext * context,
const profanedb::protobuf::DelReq * request,
profanedb::protobuf::DelResp * response) override;
private:
std::shared_ptr<profanedb::vault::rocksdb::Storage> rocksdbStorage;
std::shared_ptr<profanedb::format::protobuf::Marshaller> protobufMarshaller;
std::unique_ptr< profanedb::Db<google::protobuf::Message> > profane;
};
std::unique_ptr<DbServiceImpl> service;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment