Commit 4bb73473 authored by Giorgio Azzinnaro's avatar Giorgio Azzinnaro

moved to logic to Parser and Server (test synchronous server)

parent 009b4432
include_directories(${CMAKE_CURRENT_BINARY_DIR}/../.. ${CMAKE_CURRENT_SOURCE_DIR}/../..)
add_executable(prodisdb_server main.cpp prodis_error_collector.cpp prodis_server.cpp)
add_executable(prodisdb_server main.cpp parser.cpp server.cpp)
target_link_libraries(prodisdb_server prodisdb_protobuf)
......@@ -4,54 +4,13 @@
#include <google/protobuf/io/zero_copy_stream.h>
#include <prodisdb/protobuf/options.pb.h>
#include <prodisdb/server/prodis_error_collector.h>
#include <prodisdb/server/server.h>
const std::string PROTO_DIR = "/home/giorgio/Documents/ProdisDB/test"; // TODO Should be config
using namespace google::protobuf;
int main(int argc, char *argv[]) {
compiler::DiskSourceTree *sourceTree;
io::ZeroCopyInputStream *inputStream;
compiler::MultiFileErrorCollector *errCollector = new ProdisErrorCollector();
compiler::Importer *importer;
compiler::SourceTreeDescriptorDatabase *descriptorDb;
DescriptorPool *pool;
sourceTree = new compiler::DiskSourceTree();
sourceTree->MapPath("", "/home/giorgio/Documents/ProdisDB/src"); // prodis options
sourceTree->MapPath("", "/usr/include"); // /usr/include is for google/protobuf/... files
sourceTree->MapPath("", PROTO_DIR); // Here goes the schema defined by the user
inputStream = sourceTree->Open(""); // To load files in PROTO_DIR
if (inputStream == NULL) {
std::cerr << "Couldn't open .proto source tree: " << sourceTree->GetLastErrorMessage() << "\n";
}
descriptorDb = new compiler::SourceTreeDescriptorDatabase(sourceTree);
descriptorDb->RecordErrorsTo(errCollector);
pool = new DescriptorPool(descriptorDb);
// HACK DescriptorPool::BuildFile should be used (for performance)
pool->FindFileByName("prodisdb/protobuf/options.proto");
pool->FindFileByName("test.proto");
const Descriptor *message = pool->FindMessageTypeByName("test.Test");
const FieldDescriptor *fd;
// TODO Should check for multiple keys (might be supported later), for now throw error
for (int idx = 0; idx < message->field_count(); idx++) {
fd = message->field(idx);
std::cout
<< fd->full_name()
<< " is key: "
<< fd->options().GetExtension(prodisdb::protobuf::options).key()
<< "\n";
if (fd->options().GetExtension(prodisdb::protobuf::options).key()) {
break;
}
}
int main(int argc, char* argv[]) {
prodisdb::server::Server server;
server.Run();
}
#include "parser.h"
prodisdb::server::Parser::Parser()
{
sourceTree.MapPath("", "/usr/include"); // google/protobuf/... should be here
sourceTree.MapPath("", "/home/giorgio/Documents/ProdisDB/src"); // HACK prodisdb/options
sourceTree.MapPath("", "/home/giorgio/Documents/ProdisDB/test"); // HACK The DB schema defined by the user
inputStream = sourceTree.Open("");
if (inputStream == NULL) {
std::cerr << "Couldn't open .proto source tree: " << sourceTree.GetLastErrorMessage() << std::endl;
}
descriptorDb = new compiler::SourceTreeDescriptorDatabase(&sourceTree);
descriptorDb->RecordErrorsTo(errCollector);
pool = new DescriptorPool(descriptorDb);
pool->FindFileByName("prodisdb/protobuf/options.proto");
pool->FindFileByName("test.proto");
}
prodisdb::server::Parser::~Parser()
{
}
void prodisdb::server::Parser::ParseMessage(const protobuf::Serializable& message)
{
const Descriptor* definition = pool->FindMessageTypeByName(message.type());
const FieldDescriptor* fd;
// TODO Should check for multiple keys (might be supported later), for now throw error
for (int idx = 0; idx < definition->field_count(); idx++) {
fd = definition->field(idx);
std::cout
<< fd->full_name()
<< " is key: "
<< fd->options().GetExtension(prodisdb::protobuf::options).key()
<< std::endl;
if (fd->options().GetExtension(prodisdb::protobuf::options).key()) {
break;
}
}
std::cout << message.type() << std::endl;
}
prodisdb::server::Parser::ErrorCollector::ErrorCollector()
{
}
void prodisdb::server::Parser::ErrorCollector::AddError(const string& filename, int line, int column, const string& message)
{
if (line == -1) { // Entire file error
std::cerr << filename << " error: " << message << "\n";
} else {
std::cerr << filename << " " << line+1 << ":" << column+1 << " error: " << message << "\n";
}
}
void prodisdb::server::Parser::ErrorCollector::AddWarning(const string& filename, int line, int column, const string& message)
{
std::cerr << filename << " " << line+1 << ":" << column+1 << " warning: " << message << "\n";
}
#ifndef PARSER_H
#define PARSER_H
#include <iostream>
#include <google/protobuf/compiler/importer.h>
#include <google/protobuf/io/zero_copy_stream.h>
#include <prodisdb/protobuf/db.pb.h>
#include <prodisdb/protobuf/options.pb.h>
using namespace google::protobuf;
namespace prodisdb {
namespace server {
class Parser
{
public:
Parser();
~Parser();
void ParseMessage(const protobuf::Serializable& message);
private:
io::ZeroCopyInputStream* inputStream;
compiler::DiskSourceTree sourceTree;
compiler::MultiFileErrorCollector* errCollector = new ErrorCollector();
compiler::SourceTreeDescriptorDatabase* descriptorDb;
DescriptorPool* pool;
class ErrorCollector : public compiler::MultiFileErrorCollector {
public:
ErrorCollector();
void AddError(const string & filename, int line, int column, const string & message) override;
void AddWarning(const string & filename, int line, int column, const string & message) override;
};
};
}
}
#endif // PARSER_H
#include "prodis_error_collector.h"
ProdisErrorCollector::ProdisErrorCollector()
{
}
void ProdisErrorCollector::AddError(const std::__cxx11::string& filename, int line, int column, const std::__cxx11::string& message)
{
if (line == -1) { // Entire file error
std::cerr << filename << " error: " << message << "\n";
} else {
std::cerr << filename << " " << line+1 << ":" << column+1 << " error: " << message << "\n";
}
}
void ProdisErrorCollector::AddWarning(const std::__cxx11::string& filename, int line, int column, const std::__cxx11::string& message)
{
std::cerr << filename << " " << line+1 << ":" << column+1 << " warning: " << message << "\n";
}
#ifndef PRODISERRORCOLLECTOR_H
#define PRODISERRORCOLLECTOR_H
#include <iostream>
#include <google/protobuf/compiler/importer.h>
class ProdisErrorCollector : public google::protobuf::compiler::MultiFileErrorCollector
{
public:
ProdisErrorCollector();
virtual void AddError(const std::__cxx11::string& filename, int line, int column, const std::__cxx11::string& message);
virtual void AddWarning(const std::__cxx11::string& filename, int line, int column, const std::__cxx11::string& message);
};
#endif // PRODISERRORCOLLECTOR_H
#include "prodis_server.h"
ProdisServer::~ProdisServer()
{
server->Shutdown();
cq->Shutdown();
}
#include "server.h"
prodisdb::server::Server::Server()
{
}
prodisdb::server::Server::~Server()
{
server->Shutdown();
}
void prodisdb::server::Server::Run()
{
std::string address("0.0.0.0:50051");
grpc::ServerBuilder builder;
builder.AddListeningPort(address, grpc::InsecureServerCredentials());
builder.RegisterService(&service);
server = builder.BuildAndStart();
std::cout << "Server listening on " << address << std::endl;
HandleRpcs();
}
void prodisdb::server::Server::HandleRpcs()
{
server->Wait();
}
grpc::Status prodisdb::server::Server::DbServiceImpl::Get(grpc::ServerContext* context, const prodisdb::protobuf::PrimaryKey* request, prodisdb::protobuf::Serializable* response)
{
}
grpc::Status prodisdb::server::Server::DbServiceImpl::Create(grpc::ServerContext* context, const prodisdb::protobuf::Serializable* request, prodisdb::protobuf::Empty* response)
{
parser.ParseMessage(*request);
}
grpc::Status prodisdb::server::Server::DbServiceImpl::Delete(::grpc::ServerContext* context, const prodisdb::protobuf::PrimaryKey* request, prodisdb::protobuf::Empty* response)
{
}
grpc::Status prodisdb::server::Server::DbServiceImpl::Update(grpc::ServerContext* context, const prodisdb::protobuf::Serializable* request, prodisdb::protobuf::Empty* response)
{
}
#ifndef PRODISSERVER_H
#define PRODISSERVER_H
#ifndef SERVER_H
#define SERVER_H
#include <grpc++/grpc++.h>
#include <grpc/support/log.h>
#include <prodisdb/server/parser.h>
#include <prodisdb/protobuf/db.pb.h>
#include <prodisdb/protobuf/db.grpc.pb.h>
......@@ -12,42 +15,39 @@ using prodisdb::protobuf::PrimaryKey;
using prodisdb::protobuf::Db;
class ProdisServer
namespace prodisdb {
namespace server {
class Server
{
public:
~ProdisServer();
Server();
~Server();
void Run();
private:
void HandleRpcs();
std::unique_ptr<grpc::ServerCompletionQueue> cq;
Db::AsyncService service;
std::unique_ptr<grpc::Server> server;
class CallData {
class DbServiceImpl : public Db::Service {
public:
CallData(Db::AsyncService *service, grpc::ServerCompletionQueue *cq);
void Proceed();
grpc::Status Get(grpc::ServerContext* context, const prodisdb::protobuf::PrimaryKey* request, prodisdb::protobuf::Serializable* response) override;
private:
Db::AsyncService *mService;
grpc::ServerCompletionQueue *mCq;
grpc::Status Create(grpc::ServerContext* context, const prodisdb::protobuf::Serializable* request, prodisdb::protobuf::Empty* response) override;
grpc::ServerContext mContext;
grpc::Status Delete(::grpc::ServerContext* context, const prodisdb::protobuf::PrimaryKey* request, prodisdb::protobuf::Empty* response) override;
Empty pbEmpty;
Serializable pbSerializable;
PrimaryKey pbPrimaryKey;
grpc::Status Update(grpc::ServerContext* context, const prodisdb::protobuf::Serializable* request, prodisdb::protobuf::Empty* response) override;
grpc::ServerAsyncResponseWriter<Serializable> serializableResp;
grpc::ServerAsyncResponseWriter<Empty> emptyResp;
enum CallStatus { CREATE, PROCESS, FINISH };
CallStatus mStatus;
private:
Parser parser;
};
DbServiceImpl service;
};
#endif // PRODISSERVER_H
}
}
#endif // SERVER_H
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment