Commit 4462bb2e authored by Avinash Kumar's avatar Avinash Kumar

Adds logging to PubSub

parent 9cd97f03
Pipeline #30744198 failed with stage
in 0 seconds
......@@ -2,6 +2,7 @@
{-# LANGUAGE GADTs #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TupleSections #-}
{-# LANGUAGE TemplateHaskell #-}
module Arivi.P2P.PubSub.Handler
( pubSubHandler
......@@ -23,6 +24,8 @@ import Control.Monad.Reader
import Data.Hashable
import Data.ByteString.Lazy
import qualified Data.Set as Set
import qualified Data.Text as T
import Control.Monad.Logger (logDebug)
-- |Handler functions might never halt, they should be run
-- in a separate thread.
......@@ -44,13 +47,18 @@ notifyHandler ::
-> Request ('PubSub 'Notify) (PubSubPayload t pmsg)
-> m (Response ('PubSub 'Notify) Status)
notifyHandler nid (PubSubRequest payload@(PubSubPayload (t, msg))) = do
$(logDebug) $ T.pack ("Notify received handler invoked")
inboxed <- asks inbox
cached <- asks cache
h <- asks topicHandlers
resp <- handleTopic nid inboxed cached h t msg
case resp of
Ok -> notify payload
Error -> return ()
Ok -> do
$(logDebug) $ T.pack ("handleTopic successful notifying subscribers")
notify payload
Error -> do
$(logDebug) $ T.pack ("handleTopic unsuccessful")
return ()
return (PubSubResponse resp)
publishHandler ::
......@@ -59,13 +67,18 @@ publishHandler ::
-> Request ('PubSub 'Publish) (PubSubPayload t pmsg)
-> m (Response ('PubSub 'Publish) Status)
publishHandler nid (PubSubRequest payload@(PubSubPayload (t, msg))) = do
$(logDebug) $ T.pack ("Publish received handler invoked")
inboxed <- asks inbox
cached <- asks cache
h <- asks topicHandlers
resp <- handleTopic nid inboxed cached h t msg
case resp of
Ok -> notify payload
Error -> return ()
case resp of
Ok -> do
$(logDebug) $ T.pack ("handleTopic successful notifying subscribers")
notify payload
Error -> do
$(logDebug) $ T.pack ("handleTopic unsuccessful")
return ()
return (PubSubResponse resp)
subscribeHandler ::
......@@ -76,14 +89,14 @@ subscribeHandler ::
-> Request ('PubSub 'Subscribe) (PubSubPayload t Timer)
-> m (Response ('PubSub 'Subscribe) Status)
subscribeHandler nid (PubSubRequest (PubSubPayload (t, subTimer))) = do
subs <- asks subscribers
tops <- asks topics
success <- liftIO $ newSubscriber nid subs tops subTimer t
if success
subs <- asks subscribers
tops <- asks topics
success <- liftIO $ newSubscriber nid subs tops subTimer t
if success
then return (PubSubResponse Ok)
else return (PubSubResponse Error)
handleTopic ::
handleTopic ::
(Eq t, Hashable t, Eq msg, Hashable msg, MonadIO m)
=> NodeId
-> TVar (Inbox msg)
......
{-# LANGUAGE DataKinds #-}
{-# LANGUAGE GADTs #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE TemplateHaskell #-}
module Arivi.P2P.PubSub.Notify
( notify
......@@ -16,12 +17,15 @@ import Arivi.Utils.Set
import Control.Concurrent.STM.TVar (readTVarIO)
import Control.Monad.Except
import Control.Monad.Reader
import qualified Data.Text as T
import Control.Monad.Logger (logDebug)
notify ::
( HasP2PEnv env m r t rmsg msg)
=> PubSubPayload t msg
-> m ()
notify req@(PubSubPayload (t, msg)) = do
$(logDebug) $ T.pack ("notify called")
subs <- asks subscribers
inboxed <- join $ liftIO . readTVarIO <$> asks inbox
peers <- liftIO $ notifiersForMessage inboxed subs msg t
......@@ -33,8 +37,12 @@ notify req@(PubSubPayload (t, msg)) = do
traverseSet
(\case
Left _ -> return ()
Right (PubSubResponse Ok) -> return ()
Right (PubSubResponse Error) -> return ())
Right (PubSubResponse Ok) -> do
$(logDebug) $ T.pack ("otify Successful")
return ()
Right (PubSubResponse Error) -> do
$(logDebug) $ T.pack ("Notify failed")
return ())
responses
notifyRequest :: msg -> Request ('PubSub 'Notify) msg
......
{-# LANGUAGE DataKinds #-}
{-# LANGUAGE GADTs #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE TemplateHaskell #-}
module Arivi.P2P.PubSub.Publish
( publish
......@@ -17,9 +18,12 @@ import Control.Applicative
import Control.Monad.Except
import Control.Monad.Reader
import Data.Set (union)
import qualified Data.Text as T
import Control.Monad.Logger (logDebug)
publish :: (HasP2PEnv env m r t rmsg msg) => PubSubPayload t msg -> m ()
publish req@(PubSubPayload (t,_)) = do
$(logDebug) $ T.pack ("publish called")
subs <- asks subscribers
notf <- asks notifiers
nodes <- liftA2 union (liftIO $ subscribersForTopic t subs) (liftIO $ notifiersForTopic t notf)
......@@ -30,8 +34,12 @@ publish req@(PubSubPayload (t,_)) = do
void $ traverseSet
(\case
Left _ -> return ()
Right (PubSubResponse Ok) -> return ()
Right (PubSubResponse Error) -> return ())
Right (PubSubResponse Ok) -> do
$(logDebug) $ T.pack ("Publish successful ")
return ()
Right (PubSubResponse Error) -> do
$(logDebug) $ T.pack ("Publish Failed")
return ())
responses
publishRequest :: msg -> Request ('PubSub 'Publish) msg
......
{-# LANGUAGE DataKinds #-}
{-# LANGUAGE GADTs #-}
{-# LANGUAGE TypeFamilies #-}
{-# LANGUAGE TemplateHaskell #-}
module Arivi.P2P.PubSub.Subscribe
( subscribe
......@@ -15,16 +16,24 @@ import Arivi.P2P.Types
import Control.Monad.Except
import Control.Monad.Reader
import qualified Data.Text as T
import Control.Monad.Logger (logDebug)
subscribe ::
(HasP2PEnv env m r t rmsg msg) => PubSubPayload t Timer -> NodeId -> m ()
subscribe req@(PubSubPayload (t,_)) nid = do
$(logDebug) $ T.pack ("subscribe called")
resp <- runExceptT $ issueRequest nid (subscribeRequest req)
notf <- asks notifiers
case resp of
Left _ -> return ()
Right (PubSubResponse Ok) -> liftIO $ newNotifier nid notf t
Right (PubSubResponse Error) -> return ()
Right (PubSubResponse Ok) -> do
$(logDebug) $ T.pack ("Subscribe successful creating new Notifier")
liftIO $ newNotifier nid notf t
$(logDebug) $ T.pack ("created new Notifier with NodeId = " ++ show nid)
Right (PubSubResponse Error) -> do
$(logDebug) $ T.pack ("Subscribe failed")
return ()
subscribeRequest :: msg -> Request ('PubSub 'Subscribe) msg
subscribeRequest = PubSubRequest
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment