...
 
Commits (2)
......@@ -100,7 +100,7 @@ defaultConfig path = do
runNode :: String -> IO ()
runNode configPath = do
config <- Config.readConfig configPath
env <- mkP2PEnv config undefined undefined
env <- mkP2PEnv undefined config undefined undefined
runFileLoggingT (toS $ Config.logFile config) $
-- runStdoutLoggingT $
runAppM
......@@ -132,7 +132,7 @@ runNode configPath = do
runBSNode :: String -> IO ()
runBSNode configPath = do
config <- Config.readConfig configPath
env <- mkP2PEnv config undefined undefined
env <- mkP2PEnv undefined config undefined undefined
runFileLoggingT (toS $ Config.logFile config) $
-- runStdoutLoggingT $
runAppM
......
......@@ -19,6 +19,7 @@ import Arivi.P2P.MessageHandler.HandlerTypes
import Arivi.P2P.PubSub.Env
import Arivi.P2P.PubSub.Class
import Arivi.P2P.RPC.Types
import Arivi.P2P.PubSub.Types
import Arivi.P2P.RPC.Env
import Arivi.P2P.PRT.Types
import Arivi.Utils.Logging
......@@ -45,6 +46,7 @@ type HasP2PEnv env m r t rmsg pmsg
, HasPRT m
, MonadReader env m
, HasNetworkConfig env NetworkConfig
, HasPSGlobalHandler env r t rmsg pmsg
)
data NodeEndpointEnv = NodeEndpointEnv {
......@@ -81,8 +83,15 @@ data P2PEnv r t rmsg pmsg = P2PEnv {
, kademliaEnv :: KademliaEnv
, statsdClient :: StatsdClient
, prtEnv :: PRTEnv
, psHandler :: forall env m . (HasP2PEnv env m r t rmsg pmsg) => pmsg -> m Status
}
class HasPSGlobalHandler env r t rmsg pmsg where
psGlobalHandler :: env -> (forall env' m . (HasP2PEnv env' m r t rmsg pmsg) => pmsg -> m Status)
instance HasPSGlobalHandler (P2PEnv r t rmsg pmsg) r t rmsg pmsg where
psGlobalHandler = psHandler
class (HasSecretKey m) => HasNodeEndpoint m where
getEndpointEnv :: m NodeEndpointEnv
getNetworkConfig :: m NetworkConfig
......
......@@ -46,12 +46,11 @@ notifyHandler ::
=> NodeId
-> Request ('PubSub 'Notify) (PubSubPayload t pmsg)
-> m (Response ('PubSub 'Notify) Status)
notifyHandler nid (PubSubRequest payload@(PubSubPayload (t, msg))) = do
notifyHandler nid (PubSubRequest payload@(PubSubPayload (_, msg))) = do
$(logDebug) "Notify received handler invoked"
inboxed <- asks inbox
cached <- asks cache
h <- asks topicHandlers
resp <- handleTopic nid inboxed cached h t msg
resp <- handleTopic nid inboxed cached msg
case resp of
Ok -> do
$(logDebug) "handleTopic successful notifying subscribers"
......@@ -66,12 +65,11 @@ publishHandler ::
=> NodeId
-> Request ('PubSub 'Publish) (PubSubPayload t pmsg)
-> m (Response ('PubSub 'Publish) Status)
publishHandler nid (PubSubRequest payload@(PubSubPayload (t, msg))) = do
publishHandler nid (PubSubRequest payload@(PubSubPayload (_, msg))) = do
$(logDebug) "Publish received handler invoked"
inboxed <- asks inbox
cached <- asks cache
h <- asks topicHandlers
resp <- handleTopic nid inboxed cached h t msg
resp <- handleTopic nid inboxed cached msg
case resp of
Ok -> do
$(logDebug) "handleTopic successful notifying subscribers"
......@@ -97,17 +95,16 @@ subscribeHandler nid (PubSubRequest (PubSubPayload (t, subTimer))) = do
else return (PubSubResponse Error)
handleTopic ::
(Eq t, Hashable t, Eq msg, Hashable msg, MonadIO m)
(HasP2PEnv env m r t rmsg pmsg)
=> NodeId
-> TVar (Inbox msg)
-> TVar (Cache msg)
-> TopicHandlers t msg
-> t
-> msg
-> TVar (Inbox pmsg)
-> TVar (Cache pmsg)
-> pmsg
-> m Status
handleTopic nid inboxed cached (TopicHandlers hs) t msg = do
handleTopic nid inboxed cached msg = do
-- Add node to the inbox
Inbox inbox' <- liftIO $ readTVarIO inboxed
hh <- asks psGlobalHandler
case inbox' ^. at msg of
Just x -> liftIO $ atomically $ modifyTVar x (Set.insert nid)
Nothing -> do
......@@ -127,9 +124,6 @@ handleTopic nid inboxed cached (TopicHandlers hs) t msg = do
modifyTVar
cached
(\(Cache c) -> Cache (c & at msg ?~ def))
case hs ^. at t of
Just (TopicHandler h) -> do
resp <- h msg
liftIO $ putMVar def resp
return resp
Nothing -> error "Shouldn't reach here"
resp <- hh msg
liftIO $ putMVar def resp
return resp
{-# LANGUAGE RankNTypes #-}
module Arivi.P2P.ServiceRegistry
( mkP2PEnv
) where
......@@ -20,12 +22,48 @@ import Data.Hashable
mkHandlers :: Handlers
mkHandlers = Handlers rpcHandler kademliaMessageHandler optionsHandler pubSubHandler
mkP2PEnv :: (Ord t, Hashable t, Ord r, Hashable r) => Config.Config -> ResourceHandlers r rmsg-> TopicHandlers t pmsg -> IO (P2PEnv r t rmsg pmsg)
mkP2PEnv config rh th = do
let nc = NetworkConfig (Config.myNodeId config) (Config.myIp config) (Config.tcpPort config) (Config.udpPort config)
let networkEnv = mkAriviEnv (read $ show $ Config.tcpPort config) (read $ show $ Config.udpPort config) (Config.secretKey config)
P2PEnv <$> mkNodeEndpoint nc mkHandlers networkEnv <*> mkRpc rh <*> mkPubSub th <*> mkKademlia nc (Config.sbound config) (Config.pingThreshold config) (Config.kademliaConcurrencyFactor config) (Config.hopBound config) <*> createStatsdClient "127.0.0.1" 8080 "statsdPrefix" <*> mkPRTEnv
mkP2PEnv ::
(Ord t, Hashable t, Ord r, Hashable r)
=> (forall env m. (HasP2PEnv env m r t rmsg pmsg) =>
pmsg -> m Status)
-> Config.Config
-> ResourceHandlers r rmsg
-> TopicHandlers t pmsg
-> IO (P2PEnv r t rmsg pmsg)
mkP2PEnv psH config rh th = do
let nc =
NetworkConfig
(Config.myNodeId config)
(Config.myIp config)
(Config.tcpPort config)
(Config.udpPort config)
let networkEnv =
mkAriviEnv
(read $ show $ Config.tcpPort config)
(read $ show $ Config.udpPort config)
(Config.secretKey config)
nep <- mkNodeEndpoint nc mkHandlers networkEnv
nrpc <- mkRpc rh
nps <- mkPubSub th
nk <-
mkKademlia
nc
(Config.sbound config)
(Config.pingThreshold config)
(Config.kademliaConcurrencyFactor config)
(Config.hopBound config)
ncsc <- createStatsdClient "127.0.0.1" 8080 "statsdPrefix"
nprt <- mkPRTEnv
return
P2PEnv
{ nodeEndpointEnv = nep
, rEnv = nrpc
, psEnv = nps
, kademliaEnv = nk
, statsdClient = ncsc
, prtEnv = nprt
, psHandler = psH
}
-- makeP2Pinstance ::
-- NodeId
......
......@@ -90,7 +90,7 @@ runNode configPath = do
ResourceHandlers (HM.insert HelloWorld handlerNew HM.empty)
let topicHandlersNew =
TopicHandlers (HM.insert HelloWorldHeader handlerTopic HM.empty)
env <- mkP2PEnv config resourceHandlersNew topicHandlersNew
env <- mkP2PEnv globalHandler config resourceHandlersNew topicHandlersNew
runFileLoggingT (toS $ Config.logFile config) $
runAppM
env
......
......@@ -16,6 +16,7 @@ import Arivi.P2P.PubSub.Publish
import GHC.Generics
import Codec.Serialise
import Control.Concurrent.Async.Lifted
import Control.Monad.IO.Class
import Data.ByteString.Lazy as Lazy
import Data.Hashable
......@@ -44,16 +45,27 @@ ioHello msg =
handlerTopic :: TopicHandler String
handlerTopic = TopicHandler ioHello
globalHandler :: (HasP2PEnv env m ServiceResource ServiceTopic String String) => String -> m Status
globalHandler msg =
if msg == "HelloworldHeader"
then do
liftIO (Prelude.putStrLn "Ok")
_ <- async (getHelloWorld msg)
return Ok
else liftIO (Prelude.putStrLn "Error") >> return Error
-- registerHelloWorld :: (HasP2PEnv env m ServiceResource String String String) => m ()
-- registerHelloWorld =
-- registerResource HelloWorld handler Archived >>
-- liftIO (threadDelay 5000000) >>
-- updatePeerInResourceMap HelloWorld
getHelloWorld :: (HasP2PEnv env m ServiceResource ServiceTopic String String) => m ()
getHelloWorld = do
resource <- fetchResource (RpcPayload HelloWorld "HelloWorld")
liftIO $ print "here"
getHelloWorld :: (HasP2PEnv env m ServiceResource ServiceTopic String String) => String -> m ()
getHelloWorld msg = do
resource <- fetchResourceForMessage msg (RpcPayload HelloWorld "HelloWorld")
liftIO $ print "got resource from notify/publish"
liftIO $ print resource
stuffPublisher :: (HasP2PEnv env m ServiceResource ServiceTopic String String) => m ()
......