...
 
Commits (2)
......@@ -85,7 +85,8 @@ library
Arivi.P2P.ServiceRegistry
Arivi.P2P.Types
Arivi.P2P.LevelDB
Arivi.P2P.RPC.Functions
Arivi.P2P.RPC.Env
Arivi.P2P.RPC.Fetch
Arivi.P2P.RPC.Types
other-modules:
Arivi.P2P.Connection
......
......@@ -24,6 +24,7 @@ import Arivi.P2P.Kademlia.LoadDefaultPeers
import Arivi.P2P.MessageHandler.HandlerTypes
import Arivi.P2P.PubSub.Env
import Arivi.P2P.PubSub.Class
import Arivi.P2P.RPC.Env
import Control.Concurrent.Async.Lifted (async, wait)
import Control.Monad.Logger
......@@ -65,12 +66,6 @@ instance HasNetworkConfig (P2PEnv r t rmsg pmsg) NetworkConfig where
})
(f ((PE._networkConfig . nodeEndpointEnv) p2p))
instance HasArchivedResourcers AppM ByteString ByteString where
archived = asks (tvarArchivedResourceToPeerMap . rpcEnv)
instance HasTransientResourcers AppM ByteString ByteString where
transient = asks (tvarDynamicResourceToPeerMap . rpcEnv)
instance HasPRT AppM where
getPeerReputationHistoryTableTVar = asks (tvPeerReputationHashTable . prtEnv)
......@@ -94,6 +89,9 @@ instance HasTopicHandlers (P2PEnv r t rmsg pmsg) t pmsg where
instance HasPubSubEnv (P2PEnv r t rmsg pmsg) t pmsg where
pubSubEnv = psEnv
instance HasRpcEnv (P2PEnv r t rmsg pmsg) r rmsg where
rpcEnv = rEnv
runAppM :: P2PEnv ByteString ByteString ByteString ByteString-> AppM a -> LoggingT IO a
runAppM = flip runReaderT
......@@ -133,7 +131,7 @@ defaultConfig path = do
runNode :: String -> IO ()
runNode configPath = do
config <- Config.readConfig configPath
env <- mkP2PEnv config
env <- mkP2PEnv config undefined undefined
runFileLoggingT (toS $ Config.logFile config) $
-- runStdoutLoggingT $
runAppM
......@@ -165,7 +163,7 @@ runNode configPath = do
runBSNode :: String -> IO ()
runBSNode configPath = do
config <- Config.readConfig configPath
env <- mkP2PEnv config
env <- mkP2PEnv config undefined undefined
runFileLoggingT (toS $ Config.logFile config) $
-- runStdoutLoggingT $
runAppM
......
......@@ -10,27 +10,23 @@ import Arivi.Network
import qualified Arivi.P2P.Config as Config
import Arivi.P2P.PeerMaintainer
import Arivi.P2P.Kademlia.LoadDefaultPeers
import Arivi.P2P.Kademlia.LoadReputedPeers
import Arivi.P2P.PRT.Instance (getAllReputedNodes)
import Arivi.P2P.RPC.Types
import Arivi.P2P.RPC.Functions
-- import Arivi.P2P.Kademlia.LoadReputedPeers
-- import Arivi.P2P.PRT.Instance (getAllReputedNodes)
import qualified Data.HashMap.Strict as HM
import Control.Concurrent (threadDelay)
import Control.Concurrent.Async.Lifted
import Control.Monad.Except
-- | Called by the service in Xoken core
initP2P :: (HasP2PEnv env m r t rmsg pmsg) => Config.Config -> HM.HashMap r (ResourceHandler r rmsg) -> m ()
initP2P config resourceHandlers = do
initP2P :: (HasP2PEnv env m r t rmsg pmsg) => Config.Config -> m ()
initP2P config = do
_ <- async (runUdpServer (show (Config.udpPort config)) newIncomingConnectionHandler)
_ <- async (runTcpServer (show (Config.tcpPort config)) newIncomingConnectionHandler)
loadDefaultPeers (Config.trustedPeers config)
liftIO $ threadDelay 5000000
reputedNodes <- getAllReputedNodes
loadReputedPeers reputedNodes -- What do I do with an error. Doing nothing for now
mapM_ (\(resource, handler) -> registerResource resource handler Archived) (HM.toList resourceHandlers)
-- reputedNodes <- getAllReputedNodes
-- loadReputedPeers reputedNodes -- What do I do with an error. Doing nothing for now
_ <- async $ fillQuotas 5 -- hardcoding here assuming each resource requires 5 peers
return ()
-- wait tcpTid
......
......@@ -38,7 +38,7 @@ processRequest connHandle p2pMessage peerNodeId = do
Rpc -> serialise <$> rpc hh (deserialise $ payload p2pMessage)
Kademlia -> serialise <$> kademlia hh (deserialise $ payload p2pMessage)
Option -> serialise <$> option hh
PubSub p -> pubsub hh undefined p (payload p2pMessage)
PubSub p -> pubsub hh peerNodeId p (payload p2pMessage)
let p2pResponse = generateP2PMessage (uuid p2pMessage) (messageType p2pMessage) responseMsg
res <- LE.try $ send connHandle (serialise p2pResponse)
case res of
......
......@@ -18,18 +18,16 @@ import qualified Arivi.P2P.Kademlia.Types as T
import Arivi.P2P.MessageHandler.HandlerTypes
import Arivi.P2P.PubSub.Env
import Arivi.P2P.RPC.Types
import Arivi.P2P.RPC.Env
import Arivi.P2P.PRT.Types
import Arivi.Utils.Logging
import Arivi.Utils.Statsd
import Codec.Serialise
import Control.Monad.IO.Class
import Control.Monad.Reader
import Control.Concurrent.STM (TVar, newTVarIO)
-- import Control.Lens.TH
import Data.ByteString.Lazy (ByteString)
import Data.HashMap.Strict as HM
import Data.Hashable
import Data.Ratio (Rational, (%))
-- |Upon writing services, we might discover that topic (t) and resource (r)
......@@ -40,7 +38,7 @@ type HasP2PEnv env m r t rmsg pmsg
= ( HasNodeEndpoint m
, HasLogging m
, HasPubSub env t pmsg
, HasRpc m r rmsg
, HasRpc env r rmsg
, HasKbucket m
, HasStatsdClient m
, HasPRT m
......@@ -60,17 +58,6 @@ mkNodeEndpoint nc handlers ne = do
peerMap <- newTVarIO HM.empty
return $ NodeEndpointEnv nc peerMap handlers ne
data RpcEnv r m = RpcEnv {
tvarArchivedResourceToPeerMap :: TVar (ArchivedResourceToPeerMap r m)
, tvarDynamicResourceToPeerMap :: TVar (TransientResourceToPeerMap r m)
}
mkRpcEnv :: IO (RpcEnv r m)
mkRpcEnv =
RpcEnv <$> newTVarIO (ArchivedResourceToPeerMap HM.empty)
<*> newTVarIO (TransientResourceToPeerMap HM.empty)
data KademliaEnv = KademliaEnv {
kbucket :: T.Kbucket Int [T.Peer]
}
......@@ -88,7 +75,7 @@ mkKademlia NetworkConfig{..} sbound pingThreshold kademliaConcurrencyFactor hopB
data P2PEnv r t rmsg pmsg = P2PEnv {
nodeEndpointEnv :: NodeEndpointEnv
, rpcEnv :: RpcEnv r rmsg
, rEnv :: RpcEnv r rmsg
, psEnv :: PubSubEnv t pmsg
, kademliaEnv :: KademliaEnv
, statsdClient :: StatsdClient
......@@ -101,20 +88,6 @@ class (HasSecretKey m) => HasNodeEndpoint m where
getHandlers :: m Handlers
getNodeIdPeerMapTVarP2PEnv :: m (TVar NodeIdPeerMap)
type HasRpc m r msg
= ( HasArchivedResourcers m r msg
, HasTransientResourcers m r msg
, Serialise r
, Serialise msg
, Hashable r
, Eq r
)
class HasArchivedResourcers m r msg | m -> r msg where
archived :: m (TVar (ArchivedResourceToPeerMap r msg))
class HasTransientResourcers m r msg | m -> r msg where
transient :: m (TVar (TransientResourceToPeerMap r msg))
data PRTEnv = PRTEnv {
tvPeerReputationHashTable :: TVar PeerReputationHistoryTable
......@@ -140,76 +113,9 @@ mkPRTEnv = do
kClosestVsRandomTVar <- newTVarIO (1 % 1 :: Rational)
return (PRTEnv peerReputationHashTable servicesReputationHashMapTVar p2pReputationHashMapTVar reputedVsOtherTVar kClosestVsRandomTVar)
-- class HasPubSub where
-- getWatcherTableP2PEnv :: TVar WatchersTable
-- getNotifiersTableP2PEnv :: TVar NotifiersTable
-- getTopicHandlerMapP2PEnv :: TVar TopicHandlerMap
-- getMessageHashMapP2PEnv :: TVar MessageHashMap
-- class (T.HasKbucket m, HasStatsdClient m, HasNetworkEnv m, HasSecretKey m) =>
-- HasP2PEnv m
-- where
-- getP2PEnv :: m P2PEnv
-- getAriviTVarP2PEnv :: m NetworkConfig
-- getNodeIdPeerMapTVarP2PEnv :: m (TVar NodeIdPeerMap)
-- getArchivedResourceToPeerMapP2PEnv :: m (TVar ArchivedResourceToPeerMap)
-- getMessageTypeMapP2PEnv :: m Handlers
-- getWatcherTableP2PEnv :: m (TVar WatchersTable)
-- getNotifiersTableP2PEnv :: m (TVar NotifiersTable)
-- getTopicHandlerMapP2PEnv :: m (TVar TopicHandlerMap)
-- getMessageHashMapP2PEnv :: m (TVar MessageHashMap)
-- getTransientResourceToPeerMap :: m (TVar TransientResourceToPeerMap)
-- makeP2PEnvironment ::
-- NetworkConfig
-- -> Int
-- -> Int
-- -> Int
-- -> IO (P2PEnv r)
-- makeP2PEnvironment nc@NetworkConfig{..} sbound pingThreshold kademliaConcurrencyFactor = do
-- r2pmap <- newTVarIO (ArchivedResourceToPeerMap HM.empty)
-- dr2pmap <- newTVarIO (TransientResourceToPeerMap HM.empty)
-- let rpcEnv = RpcEnv r2pmap dr2pmap
-- kb <-
-- createKbucket
-- (T.Peer (_nodeId, T.NodeEndPoint _ip _tcpPort _udpPort))
-- sbound
-- pingThreshold
-- kademliaConcurrencyFactor
-- let kademliaEnv = KademliaEnv kb
-- -- watcherMap <- newTVarIO HM.empty
-- -- notifierMap <- newTVarIO HM.empty
-- -- topicHandleMap <- newTVarIO HM.empty
-- -- messageMap <- newTVarIO HM.empty
-- -- let pubSubEnv = PubSubEnv
-- return P2PEnv {
-- rpcEnv = rpcEnv
-- , kademliaEnv = kademliaEnv
-- -- , pubSubEnv :: PubSubEnv
-- }
-- P2PEnv
-- { _networkConfig = nc
-- , tvarNodeIdPeerMap = nmap
-- , tvarArchivedResourceToPeerMap = r2pmap
-- , tvarWatchersTable = watcherMap
-- , tvarNotifiersTable = notifierMap
-- , tvarTopicHandlerMap = topicHandleMap
-- , tvarMessageHashMap = messageMap
-- , tvarDynamicResourceToPeerMap = dr2pmap
-- , kbucket = kb
-- }
-- data Handlers = Handlers
-- { rpc :: forall m r t msg. (HasRpc m r, HasNodeEndpoint m, Eq r, Hashable r, Serialise r, MonadIO m) => Request t msg -> m (Response t msg)
-- , kademlia :: forall m t msg. (HasKbucket m, Serialise msg) => Request t msg -> m (Response t msg)
-- }
data Handlers = Handlers {
rpc :: forall m r msg. (HasNodeEndpoint m, HasRpc m r msg, MonadIO m) => Request 'Rpc (RpcPayload r msg) -> m (Response 'Rpc (RpcPayload r msg))
rpc :: forall env m r msg. (MonadReader env m, HasNodeEndpoint m, HasRpc env r msg, MonadIO m) => Request 'Rpc (RpcPayload r msg) -> m (Response 'Rpc (RpcPayload r msg))
, kademlia :: forall env m r t rmsg pmsg. (HasP2PEnv env m r t rmsg pmsg) => Request 'Kademlia T.PayLoad -> m (Response 'Kademlia T.PayLoad)
, option :: forall m r msg. (HasNodeEndpoint m, HasRpc m r msg, MonadIO m) => m (Response 'Option (Supported [r]))
, pubsub :: forall env m t msg. (MonadReader env m, HasPubSub env t msg, MonadIO m) => NodeId -> PubSub -> ByteString -> m ByteString
, option :: forall env m r msg. (MonadReader env m, HasNodeEndpoint m, HasRpc env r msg, MonadIO m) => m (Response 'Option (Supported [r]))
, pubsub :: forall env m r t rmsg pmsg. (HasP2PEnv env m r t rmsg pmsg, MonadIO m) => NodeId -> PubSub -> ByteString -> m ByteString
}
......@@ -9,45 +9,61 @@ import Arivi.P2P.Types
import Arivi.P2P.MessageHandler.HandlerTypes
import qualified Arivi.P2P.Kademlia.Types as KademliaTypes
import Arivi.P2P.PRT.Instance (getKNodes)
import Arivi.P2P.PubSub.Class
import Arivi.P2P.PubSub.Types
import Arivi.P2P.PubSub.Subscribe
import Arivi.P2P.RPC.Types
import Arivi.P2P.RPC.Env
import Arivi.P2P.RPC.SendOptions
import qualified Data.HashMap.Strict as HM
import Control.Concurrent (threadDelay)
import Control.Concurrent.Async.Lifted (mapConcurrently_)
import Control.Concurrent.STM
import Control.Monad.Reader
import Control.Monad.Except (runExceptT)
-- import Control.Monad (unless, forever)
import Control.Lens
import Data.Set as Set
-- | Sends subscribe messages for each topic to every passed peer.
-- | TODO: Batched subscribes for multiple topics.
sendSubscribes :: (HasP2PEnv env m r t rmsg pmsg) => [NodeId] -> m ()
sendSubscribes nodeList = do
topicList <- asks topics
mapM_ (subscribeForTopic nodeList) (Set.toList topicList)
subscribeForTopic :: (HasP2PEnv env m r t rmsg msg) => [NodeId] -> t -> m ()
subscribeForTopic nodeList t = mapConcurrently_ (subscribe (PubSubPayload (t, 100000))) nodeList -- Hardcoding timer value for now. No logic for handling it currently. Subscriptions are for the duration of a network connection as of now
-- | fills up the peer list for resource. Since Options message is not for a specific resource, check after each invocation of sendOptions if the number of peers if less than required quota for any resource. Recursively keep calling till all the quotas have been satisfied.
-- | TODO: Logging
fillQuotas :: (HasP2PEnv env m r t rmsg pmsg) => Integer -> m ()
fillQuotas numPeers = forever $ do
archivedMapTVar <- archived
archivedMap <- liftIO (readTVarIO archivedMapTVar)
filled <- liftIO $ isFilled archivedMap numPeers
unless filled $ do
rpcRecord <- asks rpcEnv
let Resourcers resourcers = rpcResourcers rpcRecord
filledResources <- liftIO $ isFilled resourcers numPeers
Notifiers notif <- asks notifiers
_ <- liftIO $ isFilled notif numPeers
unless filledResources $ do
res <- runExceptT $ getKNodes numPeers -- Repetition of peers
-- liftIO $ threadDelay (40 * 1000000)
case res of
Left _ -> liftIO $ threadDelay (40 * 1000000)
Right peers -> do
peerNodeIds <- addPeerFromKademlia peers
sendOptionsMessage peerNodeIds Options
liftIO $ print "waiting"
liftIO $ threadDelay (40 * 1000000)
-- fillQuotas numPeers
sendSubscribes peerNodeIds
liftIO $ threadDelay (40 * 1000000)
isFilledHelper ::Int -> [TVar [a]] -> IO Bool
isFilledHelper ::Int -> [TVar (Set a)] -> IO Bool
isFilledHelper _ [] = return True
isFilledHelper minimumNodes l = not <$> (fmap (any (< minimumNodes)) <$> mapM (fmap length <$> readTVarIO)) l
isFilledHelper minimumNodes l = not <$> (fmap (any (< minimumNodes)) <$> mapM (fmap Set.size <$> readTVarIO)) l
-- | Returns true if all the resources have met the minimumNodes quota and false otherwise
isFilled :: ArchivedResourceToPeerMap r msg -> Integer -> IO Bool
isFilled archivedMap minNodes = do
let resourceToPeerList = HM.toList (getArchivedMap archivedMap)
isFilledHelper (fromIntegral minNodes) (fmap (snd . snd) resourceToPeerList)
isFilled :: HM.HashMap a (TVar (Set b)) -> Integer -> IO Bool
isFilled hm minNodes = do
let l = HM.toList hm
isFilledHelper (fromIntegral minNodes) (fmap snd l)
-- | add the peers returned by Kademlia to the PeerDetails HashMap
addPeerFromKademlia ::
......
......@@ -51,14 +51,17 @@ type HasPubSub env t msg
)
mkPubSub :: IO (PubSubEnv t msg)
mkPubSub =
PubSubEnv <$> pure Set.empty
<*> pure (Subscribers HM.empty)
<*> pure (Notifiers HM.empty)
mkPubSub :: (Ord t, Hashable t) => TopicHandlers t msg -> IO (PubSubEnv t msg)
mkPubSub (TopicHandlers h) = do
let topicList = HM.keys h
subTVars <- mapM (\_ -> newTVarIO Set.empty) topicList
notifTVars <- mapM (\_ -> newTVarIO Set.empty) topicList
PubSubEnv <$> pure (Set.fromList topicList)
<*> pure (Subscribers (HM.fromList (zip topicList subTVars)))
<*> pure (Notifiers (HM.fromList (zip topicList notifTVars)))
<*> newTVarIO (Inbox HM.empty)
<*> newTVarIO (Cache HM.empty)
<*> pure (TopicHandlers HM.empty)
<*> pure (TopicHandlers h)
instance HasTopics (PubSubEnv t msg) t where
topics = pubSubTopics
......
......@@ -9,9 +9,10 @@ module Arivi.P2P.PubSub.Handler
import Arivi.P2P.MessageHandler.HandlerTypes (NodeId)
import Arivi.P2P.Types
import Arivi.P2P.P2PEnv
import Arivi.P2P.PubSub.Class
import Arivi.P2P.PubSub.Env
import Arivi.P2P.PubSub.Types
import Arivi.P2P.PubSub.Notify
import Codec.Serialise
import Control.Concurrent.MVar
......@@ -27,9 +28,7 @@ import qualified Data.Set as Set
-- in a separate thread.
pubSubHandler ::
( MonadReader env m
, HasPubSub env t msg
, MonadIO m
( HasP2PEnv env m r t rmsg pmsg
)
=> NodeId
-> PubSub
......@@ -40,28 +39,34 @@ pubSubHandler nid Publish req = serialise <$> publishHandler nid (deserialise re
pubSubHandler nid Notify req = serialise <$> notifyHandler nid (deserialise req)
notifyHandler ::
( MonadReader env m
, HasPubSub env t msg, MonadIO m)
( HasP2PEnv env m r t rmsg pmsg)
=> NodeId
-> Request ('PubSub 'Notify) (PubSubPayload t msg)
-> m (Response ('PubSub 'Notify) (PubSubPayload t msg))
notifyHandler nid (PubSubRequest (PubSubPayload (t, msg))) = do
-> Request ('PubSub 'Notify) (PubSubPayload t pmsg)
-> m (Response ('PubSub 'Notify) Status)
notifyHandler nid (PubSubRequest payload@(PubSubPayload (t, msg))) = do
inboxed <- asks inbox
cached <- asks cache
handlers <- asks topicHandlers
(PubSubResponse . PubSubPayload . (t, )) <$> handleTopic nid inboxed cached handlers t msg
h <- asks topicHandlers
resp <- handleTopic nid inboxed cached h t msg
case resp of
Ok -> notify payload
Error -> return ()
return (PubSubResponse resp)
publishHandler ::
( MonadReader env m
, HasPubSub env t msg, MonadIO m)
( HasP2PEnv env m r t rmsg pmsg)
=> NodeId
-> Request ('PubSub 'Publish) (PubSubPayload t msg)
-> m (Response ('PubSub 'Publish) (PubSubPayload t msg))
publishHandler nid (PubSubRequest (PubSubPayload (t, msg))) = do
-> Request ('PubSub 'Publish) (PubSubPayload t pmsg)
-> m (Response ('PubSub 'Publish) Status)
publishHandler nid (PubSubRequest payload@(PubSubPayload (t, msg))) = do
inboxed <- asks inbox
cached <- asks cache
handlers <- asks topicHandlers
(PubSubResponse . PubSubPayload . (t, )) <$> handleTopic nid inboxed cached handlers t msg
h <- asks topicHandlers
resp <- handleTopic nid inboxed cached h t msg
case resp of
Ok -> notify payload
Error -> return ()
return (PubSubResponse resp)
subscribeHandler ::
( MonadReader env m
......@@ -86,8 +91,8 @@ handleTopic ::
-> TopicHandlers t msg
-> t
-> msg
-> m msg
handleTopic nid inboxed cached (TopicHandlers handlers) t msg = do
-> m Status
handleTopic nid inboxed cached (TopicHandlers hs) t msg = do
-- Add node to the inbox
Inbox inbox' <- liftIO $ readTVarIO inboxed
case inbox' ^. at msg of
......@@ -109,9 +114,9 @@ handleTopic nid inboxed cached (TopicHandlers handlers) t msg = do
modifyTVar
cached
(\(Cache c) -> Cache (c & at msg ?~ def))
case handlers ^. at t of
case hs ^. at t of
Just (TopicHandler h) -> do
resp <- h msg
let resp = h msg
liftIO $ putMVar def resp
return resp
Nothing -> error "Shouldn't reach here"
......@@ -36,7 +36,7 @@ import Data.Time.Clock
import GHC.Generics (Generic)
newtype TopicHandler msg =
TopicHandler (forall m . msg -> m msg)
TopicHandler (msg -> Status)
type Timer = Integer
......@@ -51,7 +51,7 @@ newtype Notifiers t = Notifiers (HM.HashMap t (TVar (Set NodeId)))
newtype Inbox msg = Inbox (HM.HashMap msg (TVar (Set NodeId)))
newtype Cache msg = Cache (HM.HashMap msg (MVar msg))
newtype Cache msg = Cache (HM.HashMap msg (MVar Status))
newtype TopicHandlers t msg = TopicHandlers (HM.HashMap t (TopicHandler msg))
......@@ -132,7 +132,8 @@ newNotifier ::
-> IO ()
newNotifier nid (Notifiers notifs) t =
case notifs ^. at t of
Just x -> atomically $ modifyTVar x (Set.insert nid)
Just x ->
atomically $ modifyTVar x (Set.insert nid)
-- |Invariant this branch is never reached.
-- 'initPubSub' should statically make empty
-- sets for all topics in the map.
......
......@@ -6,21 +6,26 @@ import Arivi.Env
import qualified Arivi.P2P.Config as Config
import Arivi.P2P.PubSub.Handler
import Arivi.P2P.PubSub.Env
import Arivi.P2P.PubSub.Types
import Arivi.P2P.Kademlia.MessageHandler
import Arivi.P2P.P2PEnv
import Arivi.P2P.RPC.Handler
import Arivi.P2P.RPC.Env
import Arivi.P2P.RPC.Types
import Arivi.P2P.Types
import Arivi.Utils.Statsd
import Data.Hashable
mkHandlers :: Handlers
mkHandlers = Handlers rpcHandler kademliaMessageHandler optionsHandler pubSubHandler
mkP2PEnv :: Config.Config -> IO (P2PEnv r t rmsg pmsg)
mkP2PEnv config = do
mkP2PEnv :: (Ord t, Hashable t, Ord r, Hashable r) => Config.Config -> ResourceHandlers r rmsg-> TopicHandlers t pmsg -> IO (P2PEnv r t rmsg pmsg)
mkP2PEnv config rh th = do
let nc = NetworkConfig (Config.myNodeId config) (Config.myIp config) (Config.tcpPort config) (Config.udpPort config)
let networkEnv = mkAriviEnv (read $ show $ Config.tcpPort config) (read $ show $ Config.udpPort config) (Config.secretKey config)
P2PEnv <$> mkNodeEndpoint nc mkHandlers networkEnv <*> mkRpcEnv <*> mkPubSub <*> mkKademlia nc (Config.sbound config) (Config.pingThreshold config) (Config.kademliaConcurrencyFactor config) (Config.hopBound config) <*> createStatsdClient "127.0.0.1" 8080 "statsdPrefix" <*> mkPRTEnv
P2PEnv <$> mkNodeEndpoint nc mkHandlers networkEnv <*> mkRpc rh <*> mkPubSub th <*> mkKademlia nc (Config.sbound config) (Config.pingThreshold config) (Config.kademliaConcurrencyFactor config) (Config.hopBound config) <*> createStatsdClient "127.0.0.1" 8080 "statsdPrefix" <*> mkPRTEnv
-- makeP2Pinstance ::
-- NodeId
......
......@@ -26,7 +26,10 @@ import Arivi.P2P.Handler (newIncomingConnectionHandler)
import Arivi.P2P.Kademlia.LoadDefaultPeers
import Arivi.P2P.MessageHandler.HandlerTypes
import Arivi.P2P.PubSub.Env
import Arivi.P2P.RPC.Env
import Arivi.P2P.RPC.Types
import Arivi.P2P.PubSub.Class
import Arivi.P2P.PubSub.Types
import Control.Concurrent (threadDelay)
import Control.Concurrent.Async.Lifted (async, wait)
......@@ -41,7 +44,7 @@ import Data.Text
import System.Directory (doesPathExist)
import System.Environment (getArgs)
type AppM = ReaderT (P2PEnv ServiceResource ByteString String ByteString) (LoggingT IO)
type AppM = ReaderT (P2PEnv ServiceResource ServiceTopic String String) (LoggingT IO)
instance HasNetworkEnv AppM where
getEnv = asks (ariviNetworkEnv . nodeEndpointEnv)
......@@ -70,12 +73,6 @@ instance HasNetworkConfig (P2PEnv r t rmsg pmsg) NetworkConfig where
})
(f ((PE._networkConfig . nodeEndpointEnv) p2p))
instance HasArchivedResourcers AppM ServiceResource String where
archived = asks (tvarArchivedResourceToPeerMap . rpcEnv)
instance HasTransientResourcers AppM ServiceResource String where
transient = asks (tvarDynamicResourceToPeerMap . rpcEnv)
instance HasPRT AppM where
getPeerReputationHistoryTableTVar = asks (tvPeerReputationHashTable . prtEnv)
......@@ -96,10 +93,13 @@ instance HasCache (P2PEnv r t rmsg pmsg) pmsg where
cache = pubSubCache . psEnv
instance HasTopicHandlers (P2PEnv r t rmsg pmsg) t pmsg where
topicHandlers = pubSubHandlers . psEnv
instance HasPubSubEnv (P2PEnv ServiceResource ByteString String ByteString) ByteString ByteString where
instance HasPubSubEnv (P2PEnv ServiceResource ServiceTopic String String) ServiceTopic String where
pubSubEnv = psEnv
runAppM :: P2PEnv ServiceResource ByteString String ByteString-> AppM a -> LoggingT IO a
instance HasRpcEnv (P2PEnv r t rmsg pmsg) r rmsg where
rpcEnv = rEnv
runAppM :: P2PEnv ServiceResource ServiceTopic String String-> AppM a -> LoggingT IO a
runAppM = flip runReaderT
defaultConfig :: FilePath -> IO ()
......@@ -123,31 +123,17 @@ defaultConfig path = do
runNode :: String -> IO ()
runNode configPath = do
config <- Config.readConfig configPath
env <- mkP2PEnv config
let resourceHandlersNew = ResourceHandlers (HM.insert HelloWorld handlerNew HM.empty)
let topicHandlersNew = TopicHandlers (HM.insert HelloWorldHeader handlerTopic HM.empty)
env <- mkP2PEnv config resourceHandlersNew topicHandlersNew
runFileLoggingT (toS $ Config.logFile config) $
runAppM
env
(do
let resourceHandlers = HM.insert HelloWorld handler HM.empty
initP2P config resourceHandlers
-- tid' <-
-- async
-- (runUdpServer
-- (show (Config.udpPort config))
-- newIncomingConnectionHandler)
-- tid <-
-- async
-- (runTcpServer
-- (show (Config.tcpPort config))
-- newIncomingConnectionHandler)
-- liftIO $ threadDelay 1000000
-- void $ async (loadDefaultPeers (Config.trustedPeers config))
initP2P config
liftIO $ threadDelay 5000000
-- -- registerHelloWorld
-- -- liftIO $ threadDelay 3000000
stuffPublisher
-- getHelloWorld
-- liftIO $ threadDelay 3000000
getHelloWorld
liftIO $ threadDelay 500000000
)
......
{-# LANGUAGE MonoLocalBinds #-}
{-# LANGUAGE DeriveGeneric #-}
{-# LANGUAGE RankNTypes #-}
module Service.HelloWorld
( module Service.HelloWorld
) where
import Arivi.P2P.P2PEnv
import Arivi.P2P.RPC.Functions
import Arivi.P2P.RPC.Fetch
import Arivi.P2P.RPC.Types
import Arivi.P2P.Types
import Arivi.P2P.PubSub.Types
import Arivi.P2P.PubSub.Publish
import GHC.Generics
import Codec.Serialise
......@@ -20,11 +24,19 @@ type ServiceMsg = Lazy.ByteString
data ServiceResource = HelloWorld deriving (Eq, Ord, Show, Generic)
data ServiceTopic = HelloWorldHeader deriving (Eq, Ord, Show, Generic)
instance Serialise ServiceResource
instance Hashable ServiceResource
handler :: ResourceHandler ServiceResource String
handler = ResourceHandler (\(RpcPayload resource serviceMsg) -> RpcPayload resource (serviceMsg ++ "Praise Jesus"))
instance Serialise ServiceTopic
instance Hashable ServiceTopic
handlerNew :: ResourceHandler String
handlerNew = ResourceHandler (++ "Praise Jesus")
handlerTopic :: TopicHandler String
handlerTopic = TopicHandler (\msg -> if msg == "HelloworldHeader" then Ok else Error)
-- registerHelloWorld :: (HasP2PEnv env m ServiceResource String String String) => m ()
-- registerHelloWorld =
......@@ -32,8 +44,12 @@ handler = ResourceHandler (\(RpcPayload resource serviceMsg) -> RpcPayload resou
-- liftIO (threadDelay 5000000) >>
-- updatePeerInResourceMap HelloWorld
getHelloWorld :: (HasP2PEnv env m ServiceResource ByteString String ByteString) => m ()
getHelloWorld :: (HasP2PEnv env m ServiceResource ServiceTopic String String) => m ()
getHelloWorld = do
resource <- fetchResource (RpcPayload HelloWorld "HelloWorld")
liftIO $ print "here"
liftIO $ print resource
stuffPublisher :: (HasP2PEnv env m ServiceResource ServiceTopic String String) => m ()
stuffPublisher = publish (PubSubPayload (HelloWorldHeader, "HelloworldHeader"))
\ No newline at end of file