...
 
Commits (3)
......@@ -85,7 +85,8 @@ library
Arivi.P2P.ServiceRegistry
Arivi.P2P.Types
Arivi.P2P.LevelDB
Arivi.P2P.RPC.Functions
Arivi.P2P.RPC.Env
Arivi.P2P.RPC.Fetch
Arivi.P2P.RPC.Types
other-modules:
Arivi.P2P.Connection
......
......@@ -24,6 +24,7 @@ import Arivi.P2P.Kademlia.LoadDefaultPeers
import Arivi.P2P.MessageHandler.HandlerTypes
import Arivi.P2P.PubSub.Env
import Arivi.P2P.PubSub.Class
import Arivi.P2P.RPC.Env
import Control.Concurrent.Async.Lifted (async, wait)
import Control.Monad.Logger
......@@ -65,12 +66,6 @@ instance HasNetworkConfig (P2PEnv r t rmsg pmsg) NetworkConfig where
})
(f ((PE._networkConfig . nodeEndpointEnv) p2p))
instance HasArchivedResourcers AppM ByteString ByteString where
archived = asks (tvarArchivedResourceToPeerMap . rpcEnv)
instance HasTransientResourcers AppM ByteString ByteString where
transient = asks (tvarDynamicResourceToPeerMap . rpcEnv)
instance HasPRT AppM where
getPeerReputationHistoryTableTVar = asks (tvPeerReputationHashTable . prtEnv)
......@@ -94,6 +89,9 @@ instance HasTopicHandlers (P2PEnv r t rmsg pmsg) t pmsg where
instance HasPubSubEnv (P2PEnv r t rmsg pmsg) t pmsg where
pubSubEnv = psEnv
instance HasRpcEnv (P2PEnv r t rmsg pmsg) r rmsg where
rpcEnv = rEnv
runAppM :: P2PEnv ByteString ByteString ByteString ByteString-> AppM a -> LoggingT IO a
runAppM = flip runReaderT
......@@ -133,7 +131,7 @@ defaultConfig path = do
runNode :: String -> IO ()
runNode configPath = do
config <- Config.readConfig configPath
env <- mkP2PEnv config
env <- mkP2PEnv config undefined undefined
runFileLoggingT (toS $ Config.logFile config) $
-- runStdoutLoggingT $
runAppM
......@@ -165,7 +163,7 @@ runNode configPath = do
runBSNode :: String -> IO ()
runBSNode configPath = do
config <- Config.readConfig configPath
env <- mkP2PEnv config
env <- mkP2PEnv config undefined undefined
runFileLoggingT (toS $ Config.logFile config) $
-- runStdoutLoggingT $
runAppM
......
......@@ -10,27 +10,23 @@ import Arivi.Network
import qualified Arivi.P2P.Config as Config
import Arivi.P2P.PeerMaintainer
import Arivi.P2P.Kademlia.LoadDefaultPeers
import Arivi.P2P.Kademlia.LoadReputedPeers
import Arivi.P2P.PRT.Instance (getAllReputedNodes)
import Arivi.P2P.RPC.Types
import Arivi.P2P.RPC.Functions
-- import Arivi.P2P.Kademlia.LoadReputedPeers
-- import Arivi.P2P.PRT.Instance (getAllReputedNodes)
import qualified Data.HashMap.Strict as HM
import Control.Concurrent (threadDelay)
import Control.Concurrent.Async.Lifted
import Control.Monad.Except
-- | Called by the service in Xoken core
initP2P :: (HasP2PEnv env m r t rmsg pmsg) => Config.Config -> HM.HashMap r (ResourceHandler r rmsg) -> m ()
initP2P config resourceHandlers = do
initP2P :: (HasP2PEnv env m r t rmsg pmsg) => Config.Config -> m ()
initP2P config = do
_ <- async (runUdpServer (show (Config.udpPort config)) newIncomingConnectionHandler)
_ <- async (runTcpServer (show (Config.tcpPort config)) newIncomingConnectionHandler)
loadDefaultPeers (Config.trustedPeers config)
liftIO $ threadDelay 5000000
reputedNodes <- getAllReputedNodes
loadReputedPeers reputedNodes -- What do I do with an error. Doing nothing for now
mapM_ (\(resource, handler) -> registerResource resource handler Archived) (HM.toList resourceHandlers)
-- reputedNodes <- getAllReputedNodes
-- loadReputedPeers reputedNodes -- What do I do with an error. Doing nothing for now
_ <- async $ fillQuotas 5 -- hardcoding here assuming each resource requires 5 peers
return ()
-- wait tcpTid
......
......@@ -38,7 +38,7 @@ processRequest connHandle p2pMessage peerNodeId = do
Rpc -> serialise <$> rpc hh (deserialise $ payload p2pMessage)
Kademlia -> serialise <$> kademlia hh (deserialise $ payload p2pMessage)
Option -> serialise <$> option hh
PubSub p -> pubsub hh undefined p (payload p2pMessage)
PubSub p -> pubsub hh peerNodeId p (payload p2pMessage)
let p2pResponse = generateP2PMessage (uuid p2pMessage) (messageType p2pMessage) responseMsg
res <- LE.try $ send connHandle (serialise p2pResponse)
case res of
......
......@@ -18,18 +18,16 @@ import qualified Arivi.P2P.Kademlia.Types as T
import Arivi.P2P.MessageHandler.HandlerTypes
import Arivi.P2P.PubSub.Env
import Arivi.P2P.RPC.Types
import Arivi.P2P.RPC.Env
import Arivi.P2P.PRT.Types
import Arivi.Utils.Logging
import Arivi.Utils.Statsd
import Codec.Serialise
import Control.Monad.IO.Class
import Control.Monad.Reader
import Control.Concurrent.STM (TVar, newTVarIO)
-- import Control.Lens.TH
import Data.ByteString.Lazy (ByteString)
import Data.HashMap.Strict as HM
import Data.Hashable
import Data.Ratio (Rational, (%))
-- |Upon writing services, we might discover that topic (t) and resource (r)
......@@ -40,7 +38,7 @@ type HasP2PEnv env m r t rmsg pmsg
= ( HasNodeEndpoint m
, HasLogging m
, HasPubSub env t pmsg
, HasRpc m r rmsg
, HasRpc env r rmsg
, HasKbucket m
, HasStatsdClient m
, HasPRT m
......@@ -60,17 +58,6 @@ mkNodeEndpoint nc handlers ne = do
peerMap <- newTVarIO HM.empty
return $ NodeEndpointEnv nc peerMap handlers ne
data RpcEnv r m = RpcEnv {
tvarArchivedResourceToPeerMap :: TVar (ArchivedResourceToPeerMap r m)
, tvarDynamicResourceToPeerMap :: TVar (TransientResourceToPeerMap r m)
}
mkRpcEnv :: IO (RpcEnv r m)
mkRpcEnv =
RpcEnv <$> newTVarIO (ArchivedResourceToPeerMap HM.empty)
<*> newTVarIO (TransientResourceToPeerMap HM.empty)
data KademliaEnv = KademliaEnv {
kbucket :: T.Kbucket Int [T.Peer]
}
......@@ -88,7 +75,7 @@ mkKademlia NetworkConfig{..} sbound pingThreshold kademliaConcurrencyFactor hopB
data P2PEnv r t rmsg pmsg = P2PEnv {
nodeEndpointEnv :: NodeEndpointEnv
, rpcEnv :: RpcEnv r rmsg
, rEnv :: RpcEnv r rmsg
, psEnv :: PubSubEnv t pmsg
, kademliaEnv :: KademliaEnv
, statsdClient :: StatsdClient
......@@ -101,20 +88,6 @@ class (HasSecretKey m) => HasNodeEndpoint m where
getHandlers :: m Handlers
getNodeIdPeerMapTVarP2PEnv :: m (TVar NodeIdPeerMap)
type HasRpc m r msg
= ( HasArchivedResourcers m r msg
, HasTransientResourcers m r msg
, Serialise r
, Serialise msg
, Hashable r
, Eq r
)
class HasArchivedResourcers m r msg | m -> r msg where
archived :: m (TVar (ArchivedResourceToPeerMap r msg))
class HasTransientResourcers m r msg | m -> r msg where
transient :: m (TVar (TransientResourceToPeerMap r msg))
data PRTEnv = PRTEnv {
tvPeerReputationHashTable :: TVar PeerReputationHistoryTable
......@@ -140,76 +113,9 @@ mkPRTEnv = do
kClosestVsRandomTVar <- newTVarIO (1 % 1 :: Rational)
return (PRTEnv peerReputationHashTable servicesReputationHashMapTVar p2pReputationHashMapTVar reputedVsOtherTVar kClosestVsRandomTVar)
-- class HasPubSub where
-- getWatcherTableP2PEnv :: TVar WatchersTable
-- getNotifiersTableP2PEnv :: TVar NotifiersTable
-- getTopicHandlerMapP2PEnv :: TVar TopicHandlerMap
-- getMessageHashMapP2PEnv :: TVar MessageHashMap
-- class (T.HasKbucket m, HasStatsdClient m, HasNetworkEnv m, HasSecretKey m) =>
-- HasP2PEnv m
-- where
-- getP2PEnv :: m P2PEnv
-- getAriviTVarP2PEnv :: m NetworkConfig
-- getNodeIdPeerMapTVarP2PEnv :: m (TVar NodeIdPeerMap)
-- getArchivedResourceToPeerMapP2PEnv :: m (TVar ArchivedResourceToPeerMap)
-- getMessageTypeMapP2PEnv :: m Handlers
-- getWatcherTableP2PEnv :: m (TVar WatchersTable)
-- getNotifiersTableP2PEnv :: m (TVar NotifiersTable)
-- getTopicHandlerMapP2PEnv :: m (TVar TopicHandlerMap)
-- getMessageHashMapP2PEnv :: m (TVar MessageHashMap)
-- getTransientResourceToPeerMap :: m (TVar TransientResourceToPeerMap)
-- makeP2PEnvironment ::
-- NetworkConfig
-- -> Int
-- -> Int
-- -> Int
-- -> IO (P2PEnv r)
-- makeP2PEnvironment nc@NetworkConfig{..} sbound pingThreshold kademliaConcurrencyFactor = do
-- r2pmap <- newTVarIO (ArchivedResourceToPeerMap HM.empty)
-- dr2pmap <- newTVarIO (TransientResourceToPeerMap HM.empty)
-- let rpcEnv = RpcEnv r2pmap dr2pmap
-- kb <-
-- createKbucket
-- (T.Peer (_nodeId, T.NodeEndPoint _ip _tcpPort _udpPort))
-- sbound
-- pingThreshold
-- kademliaConcurrencyFactor
-- let kademliaEnv = KademliaEnv kb
-- -- watcherMap <- newTVarIO HM.empty
-- -- notifierMap <- newTVarIO HM.empty
-- -- topicHandleMap <- newTVarIO HM.empty
-- -- messageMap <- newTVarIO HM.empty
-- -- let pubSubEnv = PubSubEnv
-- return P2PEnv {
-- rpcEnv = rpcEnv
-- , kademliaEnv = kademliaEnv
-- -- , pubSubEnv :: PubSubEnv
-- }
-- P2PEnv
-- { _networkConfig = nc
-- , tvarNodeIdPeerMap = nmap
-- , tvarArchivedResourceToPeerMap = r2pmap
-- , tvarWatchersTable = watcherMap
-- , tvarNotifiersTable = notifierMap
-- , tvarTopicHandlerMap = topicHandleMap
-- , tvarMessageHashMap = messageMap
-- , tvarDynamicResourceToPeerMap = dr2pmap
-- , kbucket = kb
-- }
-- data Handlers = Handlers
-- { rpc :: forall m r t msg. (HasRpc m r, HasNodeEndpoint m, Eq r, Hashable r, Serialise r, MonadIO m) => Request t msg -> m (Response t msg)
-- , kademlia :: forall m t msg. (HasKbucket m, Serialise msg) => Request t msg -> m (Response t msg)
-- }
data Handlers = Handlers {
rpc :: forall m r msg. (HasNodeEndpoint m, HasRpc m r msg, MonadIO m) => Request 'Rpc (RpcPayload r msg) -> m (Response 'Rpc (RpcPayload r msg))
rpc :: forall env m r msg. (MonadReader env m, HasNodeEndpoint m, HasRpc env r msg, MonadIO m) => Request 'Rpc (RpcPayload r msg) -> m (Response 'Rpc (RpcPayload r msg))
, kademlia :: forall env m r t rmsg pmsg. (HasP2PEnv env m r t rmsg pmsg) => Request 'Kademlia T.PayLoad -> m (Response 'Kademlia T.PayLoad)
, option :: forall m r msg. (HasNodeEndpoint m, HasRpc m r msg, MonadIO m) => m (Response 'Option (Supported [r]))
, pubsub :: forall env m t msg. (MonadReader env m, HasPubSub env t msg, MonadIO m) => NodeId -> PubSub -> ByteString -> m ByteString
, option :: forall env m r msg. (MonadReader env m, HasNodeEndpoint m, HasRpc env r msg, MonadIO m) => m (Response 'Option (Supported [r]))
, pubsub :: forall env m r t rmsg pmsg. (HasP2PEnv env m r t rmsg pmsg, MonadIO m) => NodeId -> PubSub -> ByteString -> m ByteString
}
......@@ -9,45 +9,61 @@ import Arivi.P2P.Types
import Arivi.P2P.MessageHandler.HandlerTypes
import qualified Arivi.P2P.Kademlia.Types as KademliaTypes
import Arivi.P2P.PRT.Instance (getKNodes)
import Arivi.P2P.PubSub.Class
import Arivi.P2P.PubSub.Types
import Arivi.P2P.PubSub.Subscribe
import Arivi.P2P.RPC.Types
import Arivi.P2P.RPC.Env
import Arivi.P2P.RPC.SendOptions
import qualified Data.HashMap.Strict as HM
import Control.Concurrent (threadDelay)
import Control.Concurrent.Async.Lifted (mapConcurrently_)
import Control.Concurrent.STM
import Control.Monad.Reader
import Control.Monad.Except (runExceptT)
-- import Control.Monad (unless, forever)
import Control.Lens
import Data.Set as Set
-- | Sends subscribe messages for each topic to every passed peer.
-- | TODO: Batched subscribes for multiple topics.
sendSubscribes :: (HasP2PEnv env m r t rmsg pmsg) => [NodeId] -> m ()
sendSubscribes nodeList = do
topicList <- asks topics
mapM_ (subscribeForTopic nodeList) (Set.toList topicList)
subscribeForTopic :: (HasP2PEnv env m r t rmsg msg) => [NodeId] -> t -> m ()
subscribeForTopic nodeList t = mapConcurrently_ (subscribe (PubSubPayload (t, 100000))) nodeList -- Hardcoding timer value for now. No logic for handling it currently. Subscriptions are for the duration of a network connection as of now
-- | fills up the peer list for resource. Since Options message is not for a specific resource, check after each invocation of sendOptions if the number of peers if less than required quota for any resource. Recursively keep calling till all the quotas have been satisfied.
-- | TODO: Logging
fillQuotas :: (HasP2PEnv env m r t rmsg pmsg) => Integer -> m ()
fillQuotas numPeers = forever $ do
archivedMapTVar <- archived
archivedMap <- liftIO (readTVarIO archivedMapTVar)
filled <- liftIO $ isFilled archivedMap numPeers
unless filled $ do
rpcRecord <- asks rpcEnv
let Resourcers resourcers = rpcResourcers rpcRecord
filledResources <- liftIO $ isFilled resourcers numPeers
Notifiers notif <- asks notifiers
_ <- liftIO $ isFilled notif numPeers
unless filledResources $ do
res <- runExceptT $ getKNodes numPeers -- Repetition of peers
-- liftIO $ threadDelay (40 * 1000000)
case res of
Left _ -> liftIO $ threadDelay (40 * 1000000)
Right peers -> do
peerNodeIds <- addPeerFromKademlia peers
sendOptionsMessage peerNodeIds Options
liftIO $ print "waiting"
liftIO $ threadDelay (40 * 1000000)
-- fillQuotas numPeers
sendSubscribes peerNodeIds
liftIO $ threadDelay (40 * 1000000)
isFilledHelper ::Int -> [TVar [a]] -> IO Bool
isFilledHelper ::Int -> [TVar (Set a)] -> IO Bool
isFilledHelper _ [] = return True
isFilledHelper minimumNodes l = not <$> (fmap (any (< minimumNodes)) <$> mapM (fmap length <$> readTVarIO)) l
isFilledHelper minimumNodes l = not <$> (fmap (any (< minimumNodes)) <$> mapM (fmap Set.size <$> readTVarIO)) l
-- | Returns true if all the resources have met the minimumNodes quota and false otherwise
isFilled :: ArchivedResourceToPeerMap r msg -> Integer -> IO Bool
isFilled archivedMap minNodes = do
let resourceToPeerList = HM.toList (getArchivedMap archivedMap)
isFilledHelper (fromIntegral minNodes) (fmap (snd . snd) resourceToPeerList)
isFilled :: HM.HashMap a (TVar (Set b)) -> Integer -> IO Bool
isFilled hm minNodes = do
let l = HM.toList hm
isFilledHelper (fromIntegral minNodes) (fmap snd l)
-- | add the peers returned by Kademlia to the PeerDetails HashMap
addPeerFromKademlia ::
......
......@@ -51,14 +51,17 @@ type HasPubSub env t msg
)
mkPubSub :: IO (PubSubEnv t msg)
mkPubSub =
PubSubEnv <$> pure Set.empty
<*> pure (Subscribers HM.empty)
<*> pure (Notifiers HM.empty)
mkPubSub :: (Ord t, Hashable t) => TopicHandlers t msg -> IO (PubSubEnv t msg)
mkPubSub (TopicHandlers h) = do
let topicList = HM.keys h
subTVars <- mapM (\_ -> newTVarIO Set.empty) topicList
notifTVars <- mapM (\_ -> newTVarIO Set.empty) topicList
PubSubEnv <$> pure (Set.fromList topicList)
<*> pure (Subscribers (HM.fromList (zip topicList subTVars)))
<*> pure (Notifiers (HM.fromList (zip topicList notifTVars)))
<*> newTVarIO (Inbox HM.empty)
<*> newTVarIO (Cache HM.empty)
<*> pure (TopicHandlers HM.empty)
<*> pure (TopicHandlers h)
instance HasTopics (PubSubEnv t msg) t where
topics = pubSubTopics
......
......@@ -9,9 +9,10 @@ module Arivi.P2P.PubSub.Handler
import Arivi.P2P.MessageHandler.HandlerTypes (NodeId)
import Arivi.P2P.Types
import Arivi.P2P.P2PEnv
import Arivi.P2P.PubSub.Class
import Arivi.P2P.PubSub.Env
import Arivi.P2P.PubSub.Types
import Arivi.P2P.PubSub.Notify
import Codec.Serialise
import Control.Concurrent.MVar
......@@ -27,9 +28,7 @@ import qualified Data.Set as Set
-- in a separate thread.
pubSubHandler ::
( MonadReader env m
, HasPubSub env t msg
, MonadIO m
( HasP2PEnv env m r t rmsg pmsg
)
=> NodeId
-> PubSub
......@@ -40,28 +39,34 @@ pubSubHandler nid Publish req = serialise <$> publishHandler nid (deserialise re
pubSubHandler nid Notify req = serialise <$> notifyHandler nid (deserialise req)
notifyHandler ::
( MonadReader env m
, HasPubSub env t msg, MonadIO m)
( HasP2PEnv env m r t rmsg pmsg)
=> NodeId
-> Request ('PubSub 'Notify) (PubSubPayload t msg)
-> m (Response ('PubSub 'Notify) (PubSubPayload t msg))
notifyHandler nid (PubSubRequest (PubSubPayload (t, msg))) = do
-> Request ('PubSub 'Notify) (PubSubPayload t pmsg)
-> m (Response ('PubSub 'Notify) Status)
notifyHandler nid (PubSubRequest payload@(PubSubPayload (t, msg))) = do
inboxed <- asks inbox
cached <- asks cache
handlers <- asks topicHandlers
(PubSubResponse . PubSubPayload . (t, )) <$> handleTopic nid inboxed cached handlers t msg
h <- asks topicHandlers
resp <- handleTopic nid inboxed cached h t msg
case resp of
Ok -> notify payload
Error -> return ()
return (PubSubResponse resp)
publishHandler ::
( MonadReader env m
, HasPubSub env t msg, MonadIO m)
( HasP2PEnv env m r t rmsg pmsg)
=> NodeId
-> Request ('PubSub 'Publish) (PubSubPayload t msg)
-> m (Response ('PubSub 'Publish) (PubSubPayload t msg))
publishHandler nid (PubSubRequest (PubSubPayload (t, msg))) = do
-> Request ('PubSub 'Publish) (PubSubPayload t pmsg)
-> m (Response ('PubSub 'Publish) Status)
publishHandler nid (PubSubRequest payload@(PubSubPayload (t, msg))) = do
inboxed <- asks inbox
cached <- asks cache
handlers <- asks topicHandlers
(PubSubResponse . PubSubPayload . (t, )) <$> handleTopic nid inboxed cached handlers t msg
h <- asks topicHandlers
resp <- handleTopic nid inboxed cached h t msg
case resp of
Ok -> notify payload
Error -> return ()
return (PubSubResponse resp)
subscribeHandler ::
( MonadReader env m
......@@ -86,8 +91,8 @@ handleTopic ::
-> TopicHandlers t msg
-> t
-> msg
-> m msg
handleTopic nid inboxed cached (TopicHandlers handlers) t msg = do
-> m Status
handleTopic nid inboxed cached (TopicHandlers hs) t msg = do
-- Add node to the inbox
Inbox inbox' <- liftIO $ readTVarIO inboxed
case inbox' ^. at msg of
......@@ -109,9 +114,9 @@ handleTopic nid inboxed cached (TopicHandlers handlers) t msg = do
modifyTVar
cached
(\(Cache c) -> Cache (c & at msg ?~ def))
case handlers ^. at t of
case hs ^. at t of
Just (TopicHandler h) -> do
resp <- h msg
let resp = h msg
liftIO $ putMVar def resp
return resp
Nothing -> error "Shouldn't reach here"
......@@ -36,7 +36,7 @@ import Data.Time.Clock
import GHC.Generics (Generic)
newtype TopicHandler msg =
TopicHandler (forall m . msg -> m msg)
TopicHandler (msg -> Status)
type Timer = Integer
......@@ -51,7 +51,7 @@ newtype Notifiers t = Notifiers (HM.HashMap t (TVar (Set NodeId)))
newtype Inbox msg = Inbox (HM.HashMap msg (TVar (Set NodeId)))
newtype Cache msg = Cache (HM.HashMap msg (MVar msg))
newtype Cache msg = Cache (HM.HashMap msg (MVar Status))
newtype TopicHandlers t msg = TopicHandlers (HM.HashMap t (TopicHandler msg))
......@@ -132,7 +132,8 @@ newNotifier ::
-> IO ()
newNotifier nid (Notifiers notifs) t =
case notifs ^. at t of
Just x -> atomically $ modifyTVar x (Set.insert nid)
Just x ->
atomically $ modifyTVar x (Set.insert nid)
-- |Invariant this branch is never reached.
-- 'initPubSub' should statically make empty
-- sets for all topics in the map.
......
{-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE MultiParamTypeClasses #-}
{-# LANGUAGE FunctionalDependencies #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE ConstraintKinds #-}
module Arivi.P2P.RPC.Env
( module Arivi.P2P.RPC.Env
) where
import Arivi.P2P.RPC.Types
import Codec.Serialise
import Control.Concurrent.STM.TVar (newTVarIO)
import Data.HashMap.Strict as HM
import Data.Hashable
import Data.Set as Set
data RpcEnv r msg = RpcEnv {
rpcResourcers :: Resourcers r
, rpcHandlers :: ResourceHandlers r msg
}
class HasRpcEnv env r msg | env -> r msg where
rpcEnv :: env -> RpcEnv r msg
type HasRpc env r msg =
( HasRpcEnv env r msg
, Eq r, Ord r, Hashable r, Serialise r
, Eq msg, Hashable msg, Serialise msg
)
mkRpc :: (Ord r, Hashable r) => ResourceHandlers r msg -> IO (RpcEnv r msg)
mkRpc (ResourceHandlers h) = do
let resourceList = HM.keys h
resTVars <- mapM (\_ -> newTVarIO Set.empty) resourceList
RpcEnv <$> pure (Resourcers (HM.fromList (zip resourceList resTVars)))
<*> pure (ResourceHandlers h)
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE GADTs #-}
module Arivi.P2P.RPC.Fetch
( fetchResource
, fetchResourceForMessage
) where
import Arivi.P2P.Types
import Arivi.P2P.Exception
import Arivi.P2P.MessageHandler.NodeEndpoint
import Arivi.P2P.MessageHandler.HandlerTypes (NodeId)
import Arivi.P2P.P2PEnv
import Arivi.P2P.PubSub.Class
import Arivi.P2P.PubSub.Types
import Arivi.P2P.RPC.Types
import Arivi.P2P.RPC.Env
import Control.Concurrent.STM.TVar
import Control.Lens
import Control.Monad.IO.Class (liftIO)
import Control.Monad.Except
import Control.Monad.Reader
import Control.Monad.STM
import qualified Data.Set as Set
-- | Try fetching resource from a list of nodes. Return first successful response or return an error if didn't get a successfull response from any peer
sendResourceRequest ::
(HasP2PEnv env m r t msg pmsg)
=> [NodeId]
-> RpcPayload r msg
-> m (Either AriviP2PException (RpcPayload r msg))
sendResourceRequest [] _ = return (Left RPCResourceNotFoundException)
sendResourceRequest (currPeer:rest) msg = do
res <- runExceptT $ issueRequest currPeer (RpcRequest msg)
case res of
Left _ -> sendResourceRequest rest msg
Right (RpcResponse payload) ->
case payload of
resp@(RpcPayload _ _ ) -> return (Right resp)
RpcError _ -> sendResourceRequest rest msg
-- | Called by the service to fetch a resource. P2P decides best peer to ask for the resource.
fetchResource ::
(HasP2PEnv env m r t msg pmsg)
=> RpcPayload r msg
-> m (Either AriviP2PException (RpcPayload r msg))
fetchResource payload@(RpcPayload resource _) = do
rpcRecord <- asks rpcEnv
let Resourcers resourcers = rpcResourcers rpcRecord
case resourcers ^. at resource of
Just x -> do
nodeSet <- liftIO $ atomically $ readTVar x
sendResourceRequest (Set.toList nodeSet) payload
Nothing -> return (Left RPCResourceNotFoundException)
fetchResource (RpcError _) = error "Change RpcPayload constructor"
-- | Called by the service to fetch a resource as a result of a notification
fetchResourceForMessage ::
(HasP2PEnv env m r t msg pmsg)
=> pmsg
-> RpcPayload r msg
-> m (Either AriviP2PException (RpcPayload r msg))
fetchResourceForMessage storedMsg payload@(RpcPayload _ _) = do
Inbox inboxed <- join $ liftIO . readTVarIO <$> asks inbox
case inboxed ^. at storedMsg of
Just nodeListTVar -> do
nodeList <- (liftIO . readTVarIO) nodeListTVar
sendResourceRequest (Set.toList nodeList) payload -- does not make sense to pattern match here on the result and call fetchResource again. If the nodes who sent the notification don't have the resource, why request the same resource from guys who didn't send the notification
Nothing -> fetchResource payload
fetchResourceForMessage _ (RpcError _) = error "Change RpcPayload constructor"
\ No newline at end of file
......@@ -8,41 +8,32 @@ module Arivi.P2P.RPC.Handler
) where
import Arivi.P2P.Types
import Arivi.P2P.Exception
import Arivi.P2P.P2PEnv
import Arivi.P2P.RPC.Types
import Control.Concurrent.STM.TVar
import Control.Exception
import Arivi.P2P.RPC.Env
import Control.Lens
import Control.Monad.IO.Class (liftIO)
import Control.Monad.Except
import Control.Monad.Reader
import qualified Data.HashMap.Strict as HM
import Control.Applicative
rpcHandler ::
forall m r msg. (HasNodeEndpoint m, HasRpc m r msg, MonadIO m)
( MonadReader env m
, HasRpc env r msg)
=> Request 'Rpc (RpcPayload r msg)
-> m (Response 'Rpc (RpcPayload r msg))
rpcHandler (RpcRequest payload@(RpcPayload resource _)) = RpcResponse <$> do
archivedResourceMap <- archived
archivedMap <- (liftIO . readTVarIO) archivedResourceMap
transientResourceMap <- transient
transientMap <- (liftIO . readTVarIO) transientResourceMap
let entry = getTransientMap transientMap ^. at resource
<|> getArchivedMap archivedMap ^. at resource
case entry of
Nothing -> throw RPCHandlerResourceNotFoundException
Just entryMap -> do
let ResourceHandler resourceHandler = fst entryMap
return (resourceHandler payload)
rpcHandler (RpcRequest (RpcError _)) = error "Change RpcPayload constructor"
rpcHandler (RpcRequest (RpcPayload resource msg)) = RpcResponse <$> do
rpcRecord <- asks rpcEnv
let ResourceHandlers h = rpcHandlers rpcRecord
case h ^. at resource of
Just (ResourceHandler f) -> return (RpcPayload resource (f msg))
Nothing -> error "Shouldn't reach here. Will change this to single handler eventually"
rpcHandler (RpcRequest (RpcError _)) = error "Shouldn't get an error message as request"
-- | takes an options message and returns a supported message
optionsHandler ::
forall m r msg. (HasNodeEndpoint m, HasRpc m r msg, MonadIO m)
forall env m r msg. (MonadReader env m, HasRpc env r msg)
=> m (Response 'Option (Supported [r]))
optionsHandler = OptionResponse <$> do
archivedResourceMapTVar <- archived
archivedResourceMap <- (liftIO . readTVarIO) archivedResourceMapTVar
return (Supported (HM.keys (getArchivedMap archivedResourceMap)))
rpcRecord <- asks rpcEnv
let ResourceHandlers h = rpcHandlers rpcRecord
return (Supported (HM.keys h))
......@@ -9,17 +9,20 @@ import Arivi.P2P.MessageHandler.NodeEndpoint
import Arivi.P2P.MessageHandler.HandlerTypes
import Arivi.P2P.P2PEnv
import Arivi.P2P.RPC.Types
import Arivi.P2P.RPC.Env
import Arivi.P2P.Types
import qualified Control.Concurrent.Async.Lifted as LAsync(mapConcurrently_)
import Control.Concurrent.STM.TVar
import Control.Lens
import Control.Monad.IO.Class (liftIO)
import Control.Monad.Except
import Control.Monad.Reader
import Control.Monad.STM
import Data.HashMap.Strict as HM
import Data.Hashable
import qualified Data.Set as Set
--This function will send the options message to all the peers in [NodeId] on separate threads
--This is the top level function that will be exposed
sendOptionsMessage ::
( HasP2PEnv env m r t rmsg pmsg
)
......@@ -29,11 +32,7 @@ sendOptionsMessage ::
sendOptionsMessage peers optionsMessage =
LAsync.mapConcurrently_ (flip sendOptionsToPeer optionsMessage) peers
-- this function runs on each lightweight thread
-- two major functions
-- 1. Formulate and send options message
-- 2. Update the hashMap based on the supported message returned
-- blocks while waiting for a response from the Other Peer
-- | Sends the Options message to a single peer and updates the Resourcers table based on the Supported message
sendOptionsToPeer ::
forall env m r t rmsg pmsg. (HasP2PEnv env m r t rmsg pmsg)
=> NodeId
......@@ -45,49 +44,25 @@ sendOptionsToPeer recievingPeerNodeId optionsMsg = do
case res of
Left _ -> return (Left SendOptionsFailedException)
Right (OptionResponse (Supported resources :: Supported [r])) ->
Right <$> updateResourcePeers (recievingPeerNodeId, resources)
Right <$> updateResourcers recievingPeerNodeId resources
-- this wrapper will update the hashMap based on the supported message returned by the peer
updateResourcePeers ::
(HasP2PEnv env m r t rmsg pmsg) => (NodeId, [r]) -> m ()
updateResourcePeers peerResourceTuple = do
archivedResourceToPeerMapTvar <- archived
archivedResourceToPeerMap <-
liftIO $ readTVarIO archivedResourceToPeerMapTvar
let mNode = fst peerResourceTuple
let listOfResources = snd peerResourceTuple
_ <-
liftIO $
updateResourcePeersHelper
mNode
listOfResources
archivedResourceToPeerMap
return ()
updateResourcers :: (MonadReader env m, HasRpc env r msg, MonadIO m)
=> NodeId
-> [r]
-> m ()
updateResourcers nId resourceList = do
rpcRecord <- asks rpcEnv
let resourcers = rpcResourcers rpcRecord
mapM_ (updateResourcers' nId resourcers) resourceList
-- adds the peer to the TQueue of each resource
-- lookup for the current resource in the HashMap
-- assumes that the resourceIDs are present in the HashMap
-- cannot add new currently because the serviceID is not available
updateResourcePeersHelper :: (Resource r) =>
NodeId -> [r] -> ArchivedResourceToPeerMap r msg -> IO Int
updateResourcePeersHelper _ [] _ = return 0
updateResourcePeersHelper mNodeId (currResource:listOfResources) archivedResourceToPeerMap = do
let temp = HM.lookup currResource (getArchivedMap archivedResourceToPeerMap) -- check for lookup returning Nothing
case temp of
Nothing ->
updateResourcePeersHelper
mNodeId
listOfResources
archivedResourceToPeerMap
Just entry -> do
let nodeListTVar = snd entry
atomically
(do nodeList <- readTVar nodeListTVar
let updatedList = nodeList ++ [mNodeId]
writeTVar nodeListTVar updatedList)
tmp <-
updateResourcePeersHelper
mNodeId
listOfResources
archivedResourceToPeerMap
updateResourcers' ::
(Ord r, Hashable r, MonadIO m)
=> NodeId
-> Resourcers r
-> r
-> m ()
updateResourcers' nid (Resourcers resourcers) resource =
case resourcers ^. at resource of
Just x -> liftIO $ atomically $ modifyTVar x (Set.insert nid)
-- | Shouldn't reach the nothing branch as all resources should be registered
Nothing -> return ()
......@@ -2,50 +2,24 @@
{-# LANGUAGE Rank2Types #-}
module Arivi.P2P.RPC.Types
( ArchivedResourceToPeerMap(..)
, NodeId
, ResourceId
, ServiceMessage
, ResourceHandler(..)
, TransientResourceToPeerMap(..)
-- , ResourceType(..)
, Options(..)
, Supported(..)
, ArchivedOrTransient(..)
( module Arivi.P2P.RPC.Types
) where
import Arivi.P2P.MessageHandler.HandlerTypes (NodeId)
import Codec.Serialise (Serialise)
import Control.Concurrent.STM.TVar
import qualified Data.ByteString.Lazy as Lazy (ByteString)
import Data.HashMap.Strict as HM
import Data.Hashable
import Data.Set (Set)
import GHC.Generics (Generic)
import Arivi.P2P.Types (RpcPayload(..))
type ServiceMessage = Lazy.ByteString
type ResourceId = String
newtype Resourcers r = Resourcers (HM.HashMap r (TVar (Set NodeId)))
newtype ArchivedResourceToPeerMap r m = ArchivedResourceToPeerMap {
getArchivedMap :: HM.HashMap r (ResourceHandler r m, TVar [NodeId])
}
newtype ResourceHandler r msg = ResourceHandler (RpcPayload r msg -> RpcPayload r msg)
newtype ResourceHandler msg = ResourceHandler (msg -> msg)
newtype ResourceHandlers r msg = ResourceHandlers (HM.HashMap r (ResourceHandler msg))
data Options r = Options deriving (Eq, Ord, Show, Generic, Serialise)
data Supported r = Supported r deriving(Eq, Ord, Generic, Serialise, Hashable)
newtype TransientResourceToPeerMap r msg = TransientResourceToPeerMap {
getTransientMap :: HM.HashMap r (ResourceHandler r msg, TVar [NodeId])
}
data ArchivedOrTransient
= Archived
| Transient
deriving (Eq, Ord, Show, Generic)
instance Serialise ArchivedOrTransient
......@@ -6,21 +6,26 @@ import Arivi.Env
import qualified Arivi.P2P.Config as Config
import Arivi.P2P.PubSub.Handler
import Arivi.P2P.PubSub.Env
import Arivi.P2P.PubSub.Types
import Arivi.P2P.Kademlia.MessageHandler
import Arivi.P2P.P2PEnv
import Arivi.P2P.RPC.Handler
import Arivi.P2P.RPC.Env
import Arivi.P2P.RPC.Types
import Arivi.P2P.Types
import Arivi.Utils.Statsd
import Data.Hashable
mkHandlers :: Handlers
mkHandlers = Handlers rpcHandler kademliaMessageHandler optionsHandler pubSubHandler
mkP2PEnv :: Config.Config -> IO (P2PEnv r t rmsg pmsg)
mkP2PEnv config = do
mkP2PEnv :: (Ord t, Hashable t, Ord r, Hashable r) => Config.Config -> ResourceHandlers r rmsg-> TopicHandlers t pmsg -> IO (P2PEnv r t rmsg pmsg)
mkP2PEnv config rh th = do
let nc = NetworkConfig (Config.myNodeId config) (Config.myIp config) (Config.tcpPort config) (Config.udpPort config)
let networkEnv = mkAriviEnv (read $ show $ Config.tcpPort config) (read $ show $ Config.udpPort config) (Config.secretKey config)
P2PEnv <$> mkNodeEndpoint nc mkHandlers networkEnv <*> mkRpcEnv <*> mkPubSub <*> mkKademlia nc (Config.sbound config) (Config.pingThreshold config) (Config.kademliaConcurrencyFactor config) (Config.hopBound config) <*> createStatsdClient "127.0.0.1" 8080 "statsdPrefix" <*> mkPRTEnv
P2PEnv <$> mkNodeEndpoint nc mkHandlers networkEnv <*> mkRpc rh <*> mkPubSub th <*> mkKademlia nc (Config.sbound config) (Config.pingThreshold config) (Config.kademliaConcurrencyFactor config) (Config.hopBound config) <*> createStatsdClient "127.0.0.1" 8080 "statsdPrefix" <*> mkPRTEnv
-- makeP2Pinstance ::
-- NodeId
......
......@@ -26,7 +26,10 @@ import Arivi.P2P.Handler (newIncomingConnectionHandler)
import Arivi.P2P.Kademlia.LoadDefaultPeers
import Arivi.P2P.MessageHandler.HandlerTypes
import Arivi.P2P.PubSub.Env
import Arivi.P2P.RPC.Env
import Arivi.P2P.RPC.Types
import Arivi.P2P.PubSub.Class
import Arivi.P2P.PubSub.Types
import Control.Concurrent (threadDelay)
import Control.Concurrent.Async.Lifted (async, wait)
......@@ -41,7 +44,7 @@ import Data.Text
import System.Directory (doesPathExist)
import System.Environment (getArgs)
type AppM = ReaderT (P2PEnv ServiceResource ByteString String ByteString) (LoggingT IO)
type AppM = ReaderT (P2PEnv ServiceResource ServiceTopic String String) (LoggingT IO)
instance HasNetworkEnv AppM where
getEnv = asks (ariviNetworkEnv . nodeEndpointEnv)
......@@ -70,12 +73,6 @@ instance HasNetworkConfig (P2PEnv r t rmsg pmsg) NetworkConfig where
})
(f ((PE._networkConfig . nodeEndpointEnv) p2p))
instance HasArchivedResourcers AppM ServiceResource String where
archived = asks (tvarArchivedResourceToPeerMap . rpcEnv)
instance HasTransientResourcers AppM ServiceResource String where
transient = asks (tvarDynamicResourceToPeerMap . rpcEnv)
instance HasPRT AppM where
getPeerReputationHistoryTableTVar = asks (tvPeerReputationHashTable . prtEnv)
......@@ -96,10 +93,13 @@ instance HasCache (P2PEnv r t rmsg pmsg) pmsg where
cache = pubSubCache . psEnv
instance HasTopicHandlers (P2PEnv r t rmsg pmsg) t pmsg where
topicHandlers = pubSubHandlers . psEnv
instance HasPubSubEnv (P2PEnv ServiceResource ByteString String ByteString) ByteString ByteString where
instance HasPubSubEnv (P2PEnv ServiceResource ServiceTopic String String) ServiceTopic String where
pubSubEnv = psEnv
runAppM :: P2PEnv ServiceResource ByteString String ByteString-> AppM a -> LoggingT IO a
instance HasRpcEnv (P2PEnv r t rmsg pmsg) r rmsg where
rpcEnv = rEnv
runAppM :: P2PEnv ServiceResource ServiceTopic String String-> AppM a -> LoggingT IO a
runAppM = flip runReaderT
defaultConfig :: FilePath -> IO ()
......@@ -123,31 +123,17 @@ defaultConfig path = do
runNode :: String -> IO ()
runNode configPath = do
config <- Config.readConfig configPath
env <- mkP2PEnv config
let resourceHandlersNew = ResourceHandlers (HM.insert HelloWorld handlerNew HM.empty)
let topicHandlersNew = TopicHandlers (HM.insert HelloWorldHeader handlerTopic HM.empty)
env <- mkP2PEnv config resourceHandlersNew topicHandlersNew
runFileLoggingT (toS $ Config.logFile config) $
runAppM
env
(do
let resourceHandlers = HM.insert HelloWorld handler HM.empty
initP2P config resourceHandlers
-- tid' <-
-- async
-- (runUdpServer
-- (show (Config.udpPort config))
-- newIncomingConnectionHandler)
-- tid <-
-- async
-- (runTcpServer
-- (show (Config.tcpPort config))
-- newIncomingConnectionHandler)
-- liftIO $ threadDelay 1000000
-- void $ async (loadDefaultPeers (Config.trustedPeers config))
initP2P config
liftIO $ threadDelay 5000000
-- -- registerHelloWorld
-- -- liftIO $ threadDelay 3000000
stuffPublisher
-- getHelloWorld
-- liftIO $ threadDelay 3000000
getHelloWorld
liftIO $ threadDelay 500000000
)
......
{-# LANGUAGE MonoLocalBinds #-}
{-# LANGUAGE DeriveGeneric #-}
{-# LANGUAGE RankNTypes #-}
module Service.HelloWorld
( module Service.HelloWorld
) where
import Arivi.P2P.P2PEnv
import Arivi.P2P.RPC.Functions
import Arivi.P2P.RPC.Fetch
import Arivi.P2P.RPC.Types
import Arivi.P2P.Types
import Arivi.P2P.PubSub.Types
import Arivi.P2P.PubSub.Publish
import GHC.Generics
import Codec.Serialise
......@@ -20,11 +24,19 @@ type ServiceMsg = Lazy.ByteString
data ServiceResource = HelloWorld deriving (Eq, Ord, Show, Generic)
data ServiceTopic = HelloWorldHeader deriving (Eq, Ord, Show, Generic)
instance Serialise ServiceResource
instance Hashable ServiceResource
handler :: ResourceHandler ServiceResource String
handler = ResourceHandler (\(RpcPayload resource serviceMsg) -> RpcPayload resource (serviceMsg ++ "Praise Jesus"))
instance Serialise ServiceTopic
instance Hashable ServiceTopic
handlerNew :: ResourceHandler String
handlerNew = ResourceHandler (++ "Praise Jesus")
handlerTopic :: TopicHandler String
handlerTopic = TopicHandler (\msg -> if msg == "HelloworldHeader" then Ok else Error)
-- registerHelloWorld :: (HasP2PEnv env m ServiceResource String String String) => m ()
-- registerHelloWorld =
......@@ -32,8 +44,12 @@ handler = ResourceHandler (\(RpcPayload resource serviceMsg) -> RpcPayload resou
-- liftIO (threadDelay 5000000) >>
-- updatePeerInResourceMap HelloWorld
getHelloWorld :: (HasP2PEnv env m ServiceResource ByteString String ByteString) => m ()
getHelloWorld :: (HasP2PEnv env m ServiceResource ServiceTopic String String) => m ()
getHelloWorld = do
resource <- fetchResource (RpcPayload HelloWorld "HelloWorld")
liftIO $ print "here"
liftIO $ print resource
stuffPublisher :: (HasP2PEnv env m ServiceResource ServiceTopic String String) => m ()
stuffPublisher = publish (PubSubPayload (HelloWorldHeader, "HelloworldHeader"))
\ No newline at end of file