...
 
Commits (3)
......@@ -100,7 +100,7 @@ defaultConfig path = do
runNode :: String -> IO ()
runNode configPath = do
config <- Config.readConfig configPath
env <- mkP2PEnv undefined config undefined undefined
env <- mkP2PEnv config undefined undefined undefined undefined
runFileLoggingT (toS $ Config.logFile config) $
-- runStdoutLoggingT $
runAppM
......@@ -132,7 +132,7 @@ runNode configPath = do
runBSNode :: String -> IO ()
runBSNode configPath = do
config <- Config.readConfig configPath
env <- mkP2PEnv undefined config undefined undefined
env <- mkP2PEnv config undefined undefined undefined undefined
runFileLoggingT (toS $ Config.logFile config) $
-- runStdoutLoggingT $
runAppM
......
......@@ -150,8 +150,6 @@ instance HasInbox (P2PEnv r t rmsg pmsg) pmsg where
inbox = pubSubInbox . psEnv
instance HasCache (P2PEnv r t rmsg pmsg) pmsg where
cache = pubSubCache . psEnv
instance HasTopicHandlers (P2PEnv r t rmsg pmsg) t pmsg where
topicHandlers = pubSubHandlers . psEnv
instance HasPubSubEnv (P2PEnv r t rmsg pmsg) t pmsg where
pubSubEnv = psEnv
......
......@@ -20,9 +20,6 @@ class HasSubscribers env t | env -> t where
class HasNotifiers env t | env -> t where
notifiers :: env -> Notifiers t
class HasTopicHandlers env t msg | env -> t msg where
topicHandlers :: env -> TopicHandlers t msg
class HasInbox env msg | env -> msg where
inbox :: env -> TVar (Inbox msg)
......
......@@ -35,14 +35,11 @@ data PubSubEnv t msg = PubSubEnv
, pubSubNotifiers :: Notifiers t
, pubSubInbox :: TVar (Inbox msg)
, pubSubCache :: TVar (Cache msg)
, pubSubHandlers :: TopicHandlers t msg
}
class (HasTopics env t, HasSubscribers env t, HasNotifiers env t, HasInbox env msg, HasCache env msg, HasTopicHandlers env t msg) => HasPubSubEnv env t msg where
class (HasTopics env t, HasSubscribers env t, HasNotifiers env t, HasInbox env msg, HasCache env msg) => HasPubSubEnv env t msg where
pubSubEnv :: env -> PubSubEnv t msg
-- type HasPubSubEnv env t msg = (HasTopics env t, HasSubscribers env t, HasNotifiers env t, HasInbox env msg, HasCache env msg, HasTopicHandlers env t msg)
type HasPubSub env t msg
= ( HasPubSubEnv env t msg
, Eq t, Ord t, Hashable t, Serialise t
......@@ -50,9 +47,8 @@ type HasPubSub env t msg
)
mkPubSub :: (Ord t, Hashable t) => TopicHandlers t msg -> IO (PubSubEnv t msg)
mkPubSub (TopicHandlers h) = do
let topicList = HM.keys h
mkPubSub :: (Ord t, Hashable t) => [t] -> IO (PubSubEnv t msg)
mkPubSub topicList = do
subTVars <- mapM (\_ -> newTVarIO Set.empty) topicList
notifTVars <- mapM (\_ -> newTVarIO Set.empty) topicList
PubSubEnv <$> pure (Set.fromList topicList)
......@@ -60,7 +56,6 @@ mkPubSub (TopicHandlers h) = do
<*> pure (Notifiers (HM.fromList (zip topicList notifTVars)))
<*> newTVarIO (Inbox HM.empty)
<*> newTVarIO (Cache HM.empty)
<*> pure (TopicHandlers h)
instance HasTopics (PubSubEnv t msg) t where
topics = pubSubTopics
......@@ -76,6 +71,3 @@ instance HasInbox (PubSubEnv t msg) msg where
instance HasCache (PubSubEnv t msg) msg where
cache = pubSubCache
instance HasTopicHandlers (PubSubEnv t msg) t msg where
topicHandlers = pubSubHandlers
......@@ -5,12 +5,10 @@
module Arivi.P2P.PubSub.Types
( NodeTimer(..)
, TopicHandler(..)
, Subscribers(..)
, Notifiers(..)
, Inbox(..)
, Cache(..)
, TopicHandlers(..)
, Status(..)
, Timer
, subscribersForTopic
......@@ -35,10 +33,6 @@ import qualified Data.Set as Set
import Data.Time.Clock
import GHC.Generics (Generic)
import Control.Monad.IO.Class
newtype TopicHandler msg =
TopicHandler (forall m. (MonadIO m) => msg -> m Status)
type Timer = Integer
......@@ -55,8 +49,6 @@ newtype Inbox msg = Inbox (HM.HashMap msg (TVar (Set NodeId)))
newtype Cache msg = Cache (HM.HashMap msg (MVar Status))
newtype TopicHandlers t msg = TopicHandlers (HM.HashMap t (TopicHandler msg))
data Status = Ok
| Error
deriving (Eq, Ord, Show, Generic, Serialise)
......
......@@ -18,7 +18,7 @@ import Data.Set as Set
data RpcEnv r msg = RpcEnv {
rpcResourcers :: Resourcers r
, rpcHandlers :: ResourceHandlers r msg
, rpcHandlers :: ResourceHandler msg
}
class HasRpcEnv env r msg | env -> r msg where
......@@ -30,9 +30,8 @@ type HasRpc env r msg =
, Eq msg, Hashable msg, Serialise msg
)
mkRpc :: (Ord r, Hashable r) => ResourceHandlers r msg -> IO (RpcEnv r msg)
mkRpc (ResourceHandlers h) = do
let resourceList = HM.keys h
mkRpc :: (Ord r, Hashable r) => ResourceHandler msg -> [r] -> IO (RpcEnv r msg)
mkRpc rh resourceList = do
resTVars <- mapM (\_ -> newTVarIO Set.empty) resourceList
RpcEnv <$> pure (Resourcers (HM.fromList (zip resourceList resTVars)))
<*> pure (ResourceHandlers h)
<*> pure rh
......@@ -11,21 +11,18 @@ import Arivi.P2P.Types
import Arivi.P2P.RPC.Types
import Arivi.P2P.RPC.Env
import Control.Lens
import Control.Monad.Reader
import qualified Data.HashMap.Strict as HM
rpcHandler ::
( MonadReader env m
, HasRpc env r msg)
, HasRpc env r msg, MonadIO m)
=> Request 'Rpc (RpcPayload r msg)
-> m (Response 'Rpc (RpcPayload r msg))
rpcHandler (RpcRequest (RpcPayload resource msg)) = RpcResponse <$> do
rpcRecord <- asks rpcEnv
let ResourceHandlers h = rpcHandlers rpcRecord
case h ^. at resource of
Just (ResourceHandler f) -> return (RpcPayload resource (f msg))
Nothing -> error "Shouldn't reach here. Will change this to single handler eventually"
let ResourceHandler h = rpcHandlers rpcRecord
RpcPayload resource <$> h msg
rpcHandler (RpcRequest (RpcError _)) = error "Shouldn't get an error message as request"
......@@ -35,5 +32,5 @@ optionsHandler ::
=> m (Response 'Option (Supported [r]))
optionsHandler = OptionResponse <$> do
rpcRecord <- asks rpcEnv
let ResourceHandlers h = rpcHandlers rpcRecord
return (Supported (HM.keys h))
let Resourcers r = rpcResourcers rpcRecord
return (Supported (HM.keys r))
......@@ -8,17 +8,16 @@ module Arivi.P2P.RPC.Types
import Arivi.P2P.MessageHandler.HandlerTypes (NodeId)
import Codec.Serialise (Serialise)
import Control.Concurrent.STM.TVar
import Control.Monad.IO.Class
import Data.HashMap.Strict as HM
import Data.Hashable
import Data.Set (Set)
import GHC.Generics (Generic)
newtype Resourcers r = Resourcers (HM.HashMap r (TVar (Set NodeId)))
newtype ResourceHandler msg = ResourceHandler (msg -> msg)
newtype ResourceHandlers r msg = ResourceHandlers (HM.HashMap r (ResourceHandler msg))
newtype ResourceHandler msg = ResourceHandler (forall m. (MonadIO m) => msg -> m msg)
data Options r = Options deriving (Eq, Ord, Show, Generic, Serialise)
......
......@@ -24,13 +24,14 @@ mkHandlers = Handlers rpcHandler kademliaMessageHandler optionsHandler pubSubHan
mkP2PEnv ::
(Ord t, Hashable t, Ord r, Hashable r)
=> (forall env m. (HasP2PEnv env m r t rmsg pmsg) =>
pmsg -> m Status)
-> Config.Config
-> ResourceHandlers r rmsg
-> TopicHandlers t pmsg
=> Config.Config
-> ResourceHandler rmsg
-> (forall env m. (HasP2PEnv env m r t rmsg pmsg) =>
pmsg -> m Status)
-> [r]
-> [t]
-> IO (P2PEnv r t rmsg pmsg)
mkP2PEnv psH config rh th = do
mkP2PEnv config rh psH resources topics = do
let nc =
NetworkConfig
(Config.myNodeId config)
......@@ -43,8 +44,8 @@ mkP2PEnv psH config rh th = do
(read $ show $ Config.udpPort config)
(Config.secretKey config)
nep <- mkNodeEndpoint nc mkHandlers networkEnv
nrpc <- mkRpc rh
nps <- mkPubSub th
nrpc <- mkRpc rh resources
nps <- mkPubSub topics
nk <-
mkKademlia
nc
......
......@@ -86,11 +86,7 @@ defaultConfig path = do
runNode :: String -> IO ()
runNode configPath = do
config <- Config.readConfig configPath
let resourceHandlersNew =
ResourceHandlers (HM.insert HelloWorld handlerNew HM.empty)
let topicHandlersNew =
TopicHandlers (HM.insert HelloWorldHeader handlerTopic HM.empty)
env <- mkP2PEnv globalHandler config resourceHandlersNew topicHandlersNew
env <- mkP2PEnv config (ResourceHandler globalHandlerRpc) globalHandlerPubSub [HelloWorld] [HelloWorldHeader]
runFileLoggingT (toS $ Config.logFile config) $
runAppM
env
......
......@@ -8,7 +8,6 @@ module Service.HelloWorld
import Arivi.P2P.P2PEnv
import Arivi.P2P.RPC.Fetch
import Arivi.P2P.RPC.Types
import Arivi.P2P.Types
import Arivi.P2P.PubSub.Types
import Arivi.P2P.PubSub.Publish
......@@ -33,20 +32,9 @@ instance Hashable ServiceResource
instance Serialise ServiceTopic
instance Hashable ServiceTopic
handlerNew :: ResourceHandler String
handlerNew = ResourceHandler (++ "Praise Jesus")
ioHello :: (MonadIO m) => String -> m Status
ioHello msg =
if msg == "HelloworldHeader"
then liftIO (Prelude.putStrLn "Ok") >> return Ok
else liftIO (Prelude.putStrLn "Error") >> return Error
handlerTopic :: TopicHandler String
handlerTopic = TopicHandler ioHello
globalHandler :: (HasP2PEnv env m ServiceResource ServiceTopic String String) => String -> m Status
globalHandler msg =
globalHandlerPubSub :: (HasP2PEnv env m ServiceResource ServiceTopic String String) => String -> m Status
globalHandlerPubSub msg =
if msg == "HelloworldHeader"
then do
liftIO (Prelude.putStrLn "Ok")
......@@ -54,13 +42,10 @@ globalHandler msg =
return Ok
else liftIO (Prelude.putStrLn "Error") >> return Error
-- registerHelloWorld :: (HasP2PEnv env m ServiceResource String String String) => m ()
-- registerHelloWorld =
-- registerResource HelloWorld handler Archived >>
-- liftIO (threadDelay 5000000) >>
-- updatePeerInResourceMap HelloWorld
globalHandlerRpc :: (MonadIO m) => String -> m String
globalHandlerRpc msg =
if msg == "HelloWorld" then return (msg ++ "Praise Satoshi")
else return ("msg" ++ "Fake satoshi")
getHelloWorld :: (HasP2PEnv env m ServiceResource ServiceTopic String String) => String -> m ()
getHelloWorld msg = do
......