...
 
Commits (4)
......@@ -18,13 +18,8 @@ import Arivi.Network
import qualified Arivi.P2P.Config as Config
import Arivi.P2P.P2PEnv as PE
import Arivi.P2P.ServiceRegistry
import Arivi.P2P.Types
import Arivi.P2P.Handler (newIncomingConnectionHandler)
import Arivi.P2P.Kademlia.LoadDefaultPeers
import Arivi.P2P.MessageHandler.HandlerTypes
import Arivi.P2P.PubSub.Env
import Arivi.P2P.PubSub.Class
import Arivi.P2P.RPC.Env
import Control.Concurrent.Async.Lifted (async, wait)
import Control.Monad.Logger
......@@ -56,17 +51,6 @@ instance HasNodeEndpoint AppM where
getHandlers = asks (handlers . nodeEndpointEnv)
getNodeIdPeerMapTVarP2PEnv = asks (tvarNodeIdPeerMap . nodeEndpointEnv)
instance HasNetworkConfig (P2PEnv r t rmsg pmsg) NetworkConfig where
networkConfig f p2p =
fmap
(\nc ->
p2p
{ nodeEndpointEnv =
(nodeEndpointEnv p2p) {PE._networkConfig = nc}
})
(f ((PE._networkConfig . nodeEndpointEnv) p2p))
instance HasPRT AppM where
getPeerReputationHistoryTableTVar = asks (tvPeerReputationHashTable . prtEnv)
getServicesReputationHashMapTVar = asks (tvServicesReputationHashMap . prtEnv)
......@@ -74,25 +58,10 @@ instance HasPRT AppM where
getReputedVsOtherTVar = asks (tvReputedVsOther . prtEnv)
getKClosestVsRandomTVar = asks (tvKClosestVsRandom . prtEnv)
instance HasTopics (P2PEnv r t rmsg pmsg) t where
topics = pubSubTopics . psEnv
instance HasSubscribers (P2PEnv r t rmsg pmsg) t where
subscribers = pubSubSubscribers . psEnv
instance HasNotifiers (P2PEnv r t rmsg pmsg) t where
notifiers = pubSubNotifiers . psEnv
instance HasInbox (P2PEnv r t rmsg pmsg) pmsg where
inbox = pubSubInbox . psEnv
instance HasCache (P2PEnv r t rmsg pmsg) pmsg where
cache = pubSubCache . psEnv
instance HasTopicHandlers (P2PEnv r t rmsg pmsg) t pmsg where
topicHandlers = pubSubHandlers . psEnv
instance HasPubSubEnv (P2PEnv r t rmsg pmsg) t pmsg where
pubSubEnv = psEnv
instance HasRpcEnv (P2PEnv r t rmsg pmsg) r rmsg where
rpcEnv = rEnv
runAppM :: P2PEnv ByteString ByteString ByteString ByteString-> AppM a -> LoggingT IO a
runAppM ::
P2PEnv ByteString ByteString ByteString ByteString
-> AppM a
-> LoggingT IO a
runAppM = flip runReaderT
{--
......
{-# LANGUAGE MultiParamTypeClasses #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE FunctionalDependencies #-}
{-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE ConstraintKinds #-}
{-# LANGUAGE DataKinds #-}
......@@ -17,6 +17,7 @@ import Arivi.P2P.Kademlia.Types (HasKbucket)
import qualified Arivi.P2P.Kademlia.Types as T
import Arivi.P2P.MessageHandler.HandlerTypes
import Arivi.P2P.PubSub.Env
import Arivi.P2P.PubSub.Class
import Arivi.P2P.RPC.Types
import Arivi.P2P.RPC.Env
import Arivi.P2P.PRT.Types
......@@ -119,3 +120,32 @@ data Handlers = Handlers {
, option :: forall env m r msg. (MonadReader env m, HasNodeEndpoint m, HasRpc env r msg, MonadIO m) => m (Response 'Option (Supported [r]))
, pubsub :: forall env m r t rmsg pmsg. (HasP2PEnv env m r t rmsg pmsg, MonadIO m) => NodeId -> PubSub -> ByteString -> m ByteString
}
instance HasNetworkConfig (P2PEnv r t rmsg pmsg) NetworkConfig where
networkConfig f p2p =
fmap
(\nc ->
p2p
{ nodeEndpointEnv =
(nodeEndpointEnv p2p) {Arivi.P2P.P2PEnv._networkConfig = nc}
})
(f ((Arivi.P2P.P2PEnv._networkConfig . nodeEndpointEnv) p2p))
instance HasTopics (P2PEnv r t rmsg pmsg) t where
topics = pubSubTopics . psEnv
instance HasSubscribers (P2PEnv r t rmsg pmsg) t where
subscribers = pubSubSubscribers . psEnv
instance HasNotifiers (P2PEnv r t rmsg pmsg) t where
notifiers = pubSubNotifiers . psEnv
instance HasInbox (P2PEnv r t rmsg pmsg) pmsg where
inbox = pubSubInbox . psEnv
instance HasCache (P2PEnv r t rmsg pmsg) pmsg where
cache = pubSubCache . psEnv
instance HasTopicHandlers (P2PEnv r t rmsg pmsg) t pmsg where
topicHandlers = pubSubHandlers . psEnv
instance HasPubSubEnv (P2PEnv r t rmsg pmsg) t pmsg where
pubSubEnv = psEnv
instance HasRpcEnv (P2PEnv r t rmsg pmsg) r rmsg where
rpcEnv = rEnv
{-# OPTIONS_GHC -fno-warn-orphans #-}
{-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE FunctionalDependencies#-}
{-# LANGUAGE MultiParamTypeClasses #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE UndecidableInstances #-}
......
......@@ -24,7 +24,6 @@ import Control.Monad.Reader
import Data.Hashable
import Data.ByteString.Lazy
import qualified Data.Set as Set
import qualified Data.Text as T
import Control.Monad.Logger (logDebug)
-- |Handler functions might never halt, they should be run
......@@ -47,17 +46,17 @@ notifyHandler ::
-> Request ('PubSub 'Notify) (PubSubPayload t pmsg)
-> m (Response ('PubSub 'Notify) Status)
notifyHandler nid (PubSubRequest payload@(PubSubPayload (t, msg))) = do
$(logDebug) $ T.pack ("Notify received handler invoked")
$(logDebug) "Notify received handler invoked"
inboxed <- asks inbox
cached <- asks cache
h <- asks topicHandlers
resp <- handleTopic nid inboxed cached h t msg
case resp of
Ok -> do
$(logDebug) $ T.pack ("handleTopic successful notifying subscribers")
$(logDebug) "handleTopic successful notifying subscribers"
notify payload
Error -> do
$(logDebug) $ T.pack ("handleTopic unsuccessful")
$(logDebug) "handleTopic unsuccessful"
return ()
return (PubSubResponse resp)
......@@ -67,17 +66,17 @@ publishHandler ::
-> Request ('PubSub 'Publish) (PubSubPayload t pmsg)
-> m (Response ('PubSub 'Publish) Status)
publishHandler nid (PubSubRequest payload@(PubSubPayload (t, msg))) = do
$(logDebug) $ T.pack ("Publish received handler invoked")
$(logDebug) "Publish received handler invoked"
inboxed <- asks inbox
cached <- asks cache
h <- asks topicHandlers
resp <- handleTopic nid inboxed cached h t msg
case resp of
Ok -> do
$(logDebug) $ T.pack ("handleTopic successful notifying subscribers")
$(logDebug) "handleTopic successful notifying subscribers"
notify payload
Error -> do
$(logDebug) $ T.pack ("handleTopic unsuccessful")
$(logDebug) "handleTopic unsuccessful"
return ()
return (PubSubResponse resp)
......@@ -129,7 +128,7 @@ handleTopic nid inboxed cached (TopicHandlers hs) t msg = do
(\(Cache c) -> Cache (c & at msg ?~ def))
case hs ^. at t of
Just (TopicHandler h) -> do
let resp = h msg
resp <- h msg
liftIO $ putMVar def resp
return resp
Nothing -> error "Shouldn't reach here"
{-# LANGUAGE DataKinds #-}
{-# LANGUAGE GADTs #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE TemplateHaskell #-}
module Arivi.P2P.PubSub.Notify
......@@ -17,7 +18,6 @@ import Arivi.Utils.Set
import Control.Concurrent.STM.TVar (readTVarIO)
import Control.Monad.Except
import Control.Monad.Reader
import qualified Data.Text as T
import Control.Monad.Logger (logDebug)
notify ::
......@@ -25,7 +25,7 @@ notify ::
=> PubSubPayload t msg
-> m ()
notify req@(PubSubPayload (t, msg)) = do
$(logDebug) $ T.pack ("notify called")
$(logDebug) "notify called"
subs <- asks subscribers
inboxed <- join $ liftIO . readTVarIO <$> asks inbox
peers <- liftIO $ notifiersForMessage inboxed subs msg t
......@@ -38,10 +38,10 @@ notify req@(PubSubPayload (t, msg)) = do
(\case
Left _ -> return ()
Right (PubSubResponse Ok) -> do
$(logDebug) $ T.pack ("otify Successful")
$(logDebug) "Notify Successful"
return ()
Right (PubSubResponse Error) -> do
$(logDebug) $ T.pack ("Notify failed")
$(logDebug) "Notify failed"
return ())
responses
......
{-# LANGUAGE DataKinds #-}
{-# LANGUAGE GADTs #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE TemplateHaskell #-}
module Arivi.P2P.PubSub.Publish
......@@ -18,12 +19,11 @@ import Control.Applicative
import Control.Monad.Except
import Control.Monad.Reader
import Data.Set (union)
import qualified Data.Text as T
import Control.Monad.Logger (logDebug)
publish :: (HasP2PEnv env m r t rmsg msg) => PubSubPayload t msg -> m ()
publish req@(PubSubPayload (t,_)) = do
$(logDebug) $ T.pack ("publish called")
$(logDebug) "publish called"
subs <- asks subscribers
notf <- asks notifiers
nodes <- liftA2 union (liftIO $ subscribersForTopic t subs) (liftIO $ notifiersForTopic t notf)
......@@ -35,10 +35,10 @@ publish req@(PubSubPayload (t,_)) = do
(\case
Left _ -> return ()
Right (PubSubResponse Ok) -> do
$(logDebug) $ T.pack ("Publish successful ")
$(logDebug) "Publish successful "
return ()
Right (PubSubResponse Error) -> do
$(logDebug) $ T.pack ("Publish Failed")
$(logDebug) "Publish Failed"
return ())
responses
......
{-# LANGUAGE DataKinds #-}
{-# LANGUAGE GADTs #-}
{-# LANGUAGE TypeFamilies #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE TemplateHaskell #-}
module Arivi.P2P.PubSub.Subscribe
......@@ -16,23 +17,23 @@ import Arivi.P2P.Types
import Control.Monad.Except
import Control.Monad.Reader
import qualified Data.Text as T
import Control.Monad.Logger (logDebug)
import qualified Data.Text as T
subscribe ::
(HasP2PEnv env m r t rmsg msg) => PubSubPayload t Timer -> NodeId -> m ()
subscribe req@(PubSubPayload (t,_)) nid = do
$(logDebug) $ T.pack ("subscribe called")
$(logDebug) "subscribe called"
resp <- runExceptT $ issueRequest nid (subscribeRequest req)
notf <- asks notifiers
case resp of
Left _ -> return ()
Right (PubSubResponse Ok) -> do
$(logDebug) $ T.pack ("Subscribe successful creating new Notifier")
$(logDebug) "Subscribe successful creating new Notifier"
liftIO $ newNotifier nid notf t
$(logDebug) $ T.pack ("created new Notifier with NodeId = " ++ show nid)
Right (PubSubResponse Error) -> do
$(logDebug) $ T.pack ("Subscribe failed")
$(logDebug) "Subscribe failed"
return ()
subscribeRequest :: msg -> Request ('PubSub 'Subscribe) msg
......
......@@ -35,8 +35,10 @@ import qualified Data.Set as Set
import Data.Time.Clock
import GHC.Generics (Generic)
import Control.Monad.IO.Class
newtype TopicHandler msg =
TopicHandler (msg -> Status)
TopicHandler (forall m. (MonadIO m) => msg -> m Status)
type Timer = Integer
......@@ -137,4 +139,4 @@ newNotifier nid (Notifiers notifs) t =
-- |Invariant this branch is never reached.
-- 'initPubSub' should statically make empty
-- sets for all topics in the map.
Nothing -> return ()
\ No newline at end of file
Nothing -> return ()
......@@ -30,7 +30,7 @@ sendOptionsMessage ::
-> Options r
-> m ()
sendOptionsMessage peers optionsMessage =
LAsync.mapConcurrently_ (flip sendOptionsToPeer optionsMessage) peers
LAsync.mapConcurrently_ (`sendOptionsToPeer` optionsMessage) peers
-- | Sends the Options message to a single peer and updates the Resourcers table based on the Supported message
sendOptionsToPeer ::
......
......@@ -21,18 +21,10 @@ import Arivi.Network
import qualified Arivi.P2P.Config as Config
import Arivi.P2P.P2PEnv as PE
import Arivi.P2P.ServiceRegistry
import Arivi.P2P.Types
import Arivi.P2P.Handler (newIncomingConnectionHandler)
import Arivi.P2P.Kademlia.LoadDefaultPeers
import Arivi.P2P.MessageHandler.HandlerTypes
import Arivi.P2P.PubSub.Env
import Arivi.P2P.RPC.Env
import Arivi.P2P.RPC.Types
import Arivi.P2P.PubSub.Class
import Arivi.P2P.PubSub.Types
import Control.Concurrent (threadDelay)
import Control.Concurrent.Async.Lifted (async, wait)
import Control.Monad.Logger
import Control.Monad.Reader
import Data.ByteString.Lazy as BSL (ByteString)
......@@ -63,17 +55,6 @@ instance HasNodeEndpoint AppM where
getHandlers = asks (handlers . nodeEndpointEnv)
getNodeIdPeerMapTVarP2PEnv = asks (tvarNodeIdPeerMap . nodeEndpointEnv)
instance HasNetworkConfig (P2PEnv r t rmsg pmsg) NetworkConfig where
networkConfig f p2p =
fmap
(\nc ->
p2p
{ nodeEndpointEnv =
(nodeEndpointEnv p2p) {PE._networkConfig = nc}
})
(f ((PE._networkConfig . nodeEndpointEnv) p2p))
instance HasPRT AppM where
getPeerReputationHistoryTableTVar = asks (tvPeerReputationHashTable . prtEnv)
getServicesReputationHashMapTVar = asks (tvServicesReputationHashMap . prtEnv)
......@@ -81,24 +62,6 @@ instance HasPRT AppM where
getReputedVsOtherTVar = asks (tvReputedVsOther . prtEnv)
getKClosestVsRandomTVar = asks (tvKClosestVsRandom . prtEnv)
instance HasTopics (P2PEnv r t rmsg pmsg) t where
topics = pubSubTopics . psEnv
instance HasSubscribers (P2PEnv r t rmsg pmsg) t where
subscribers = pubSubSubscribers . psEnv
instance HasNotifiers (P2PEnv r t rmsg pmsg) t where
notifiers = pubSubNotifiers . psEnv
instance HasInbox (P2PEnv r t rmsg pmsg) pmsg where
inbox = pubSubInbox . psEnv
instance HasCache (P2PEnv r t rmsg pmsg) pmsg where
cache = pubSubCache . psEnv
instance HasTopicHandlers (P2PEnv r t rmsg pmsg) t pmsg where
topicHandlers = pubSubHandlers . psEnv
instance HasPubSubEnv (P2PEnv ServiceResource ServiceTopic String String) ServiceTopic String where
pubSubEnv = psEnv
instance HasRpcEnv (P2PEnv r t rmsg pmsg) r rmsg where
rpcEnv = rEnv
runAppM :: P2PEnv ServiceResource ServiceTopic String String-> AppM a -> LoggingT IO a
runAppM = flip runReaderT
......
......@@ -5,4 +5,7 @@ trustedPeers: []
udpPort: 8080
tcpPort: 8080
myNodeId: ! "å\a»\x8AÊ×\x1F\x17%$æʶ,\x15¦\x12ÊQ\x93ÊKaGÊ\nÑTmÇ\v\x8Fï\x8D¡·\x80ÿ;Ø \x1Eß\x8B~OU_9\x11¨\x80kÙÐ\x126\x01±'0¢PQ"
\ No newline at end of file
sbound: 20
pingThreshold: 3
kademliaConcurrencyFactor: 5
hopBound: 5
......@@ -35,8 +35,14 @@ instance Hashable ServiceTopic
handlerNew :: ResourceHandler String
handlerNew = ResourceHandler (++ "Praise Jesus")
ioHello :: (MonadIO m) => String -> m Status
ioHello msg =
if msg == "HelloworldHeader"
then liftIO (Prelude.putStrLn "Ok") >> return Ok
else liftIO (Prelude.putStrLn "Error") >> return Error
handlerTopic :: TopicHandler String
handlerTopic = TopicHandler (\msg -> if msg == "HelloworldHeader" then Ok else Error)
handlerTopic = TopicHandler ioHello
-- registerHelloWorld :: (HasP2PEnv env m ServiceResource String String String) => m ()
-- registerHelloWorld =
......@@ -50,6 +56,5 @@ getHelloWorld = do
liftIO $ print "here"
liftIO $ print resource
stuffPublisher :: (HasP2PEnv env m ServiceResource ServiceTopic String String) => m ()
stuffPublisher = publish (PubSubPayload (HelloWorldHeader, "HelloworldHeader"))
\ No newline at end of file
stuffPublisher = publish (PubSubPayload (HelloWorldHeader, "HelloworldHeader"))