Commit 194793d0 authored by Heimdall's avatar Heimdall

37-issue rename mailbox

parent eb8b3c85
Pipeline #118110310 failed with stage
in 2 minutes and 11 seconds
......@@ -82,8 +82,6 @@ func parseFlags() (generalConf common.GeneralConfig, tssConf common.TssConfig, p
flag.StringVar(&generalConf.BaseFolder, "home", "", "home folder to store the keygen state file")
// we setup the Tss parameter configuration
flag.DurationVar(&tssConf.KeyGenTimeout, "gentimeout", 30*time.Second, "keygen timeout")
flag.DurationVar(&tssConf.KeySignTimeout, "signtimeout", 30*time.Second, "keysign timeout")
flag.DurationVar(&tssConf.PreParamTimeout, "preparamtimeout", 5*time.Minute, "pre-parameter generation timeout")
// we setup the p2p network configuration
......
......@@ -129,51 +129,7 @@ func getPeerIDFromPartyID(partyID *btss.PartyID) (peer.ID, error) {
// return nil
// }
//
// func (t *TssCommon) ProcessOutCh(msg btss.Message, msgType p2p.THORChainTSSMessageType) error {
// buf, r, err := msg.WireBytes()
// // if we cannot get the wire share, the tss keygen will fail, we just quit.
// if err != nil {
// return fmt.Errorf("fail to get wire bytes: %w", err)
// }
// wireMsg := p2p.WireMessage{
// Routing: r,
// RoundInfo: msg.Type(),
// Message: buf,
// }
// wireMsgBytes, err := json.Marshal(wireMsg)
// if err != nil {
// return fmt.Errorf("fail to convert tss msg to wire bytes: %w", err)
// }
// wrappedMsg := p2p.WrappedMessage{
// MessageType: msgType,
// MsgID: t.msgID,
// Payload: wireMsgBytes,
// }
// peerIDs := make([]peer.ID, 0)
// if len(r.To) == 0 {
// peerIDs = t.P2PPeers
// if len(peerIDs) == 0 {
// t.logger.Error().Msg("fail to get any peer ids")
// return errors.New("fail to get any peer id")
// }
//
// } else {
// for _, each := range r.To {
// peerID, ok := t.PartyIDtoP2PID[each.Id]
// if !ok {
// t.logger.Error().Msg("error in find the P2P ID")
// continue
// }
// peerIDs = append(peerIDs, peerID)
// }
// }
// t.renderToP2P(&p2p.BroadcastMsgChan{
// WrappedMessage: wrappedMsg,
// PeersID: peerIDs,
// })
//
// return nil
// }
//
// func (t *TssCommon) processVerMsg(broadcastConfirmMsg *p2p.BroadcastConfirmMessage) error {
// t.logger.Debug().Msg("process ver msg")
......@@ -227,115 +183,6 @@ func getPeerIDFromPartyID(partyID *btss.PartyID) (peer.ID, error) {
// return nil
// }
//
// // processTSSMsg
// func (t *TssCommon) processTSSMsg(wireMsg *p2p.WireMessage, msgType p2p.THORChainTSSMessageType) error {
// t.logger.Debug().Msg("process wire message")
// defer t.logger.Debug().Msg("finish process wire message")
// // we only update it local party
// if !wireMsg.Routing.IsBroadcast {
// t.logger.Debug().Msgf("msg from %s to %+v", wireMsg.Routing.From, wireMsg.Routing.To)
// return t.updateLocal(wireMsg)
// }
// // broadcast message , we save a copy locally , and then tell all others what we got
// msgHash, err := BytesToHashString(wireMsg.Message)
// if err != nil {
// return fmt.Errorf("fail to calculate hash of the wire message: %w", err)
// }
// partyInfo := t.getPartyInfo()
// key := wireMsg.GetCacheKey()
// // P2PID will be filled up by the receiver.
// broadcastConfirmMsg := &p2p.BroadcastConfirmMessage{
// P2PID: "",
// Key: key,
// Hash: msgHash,
// }
// localCacheItem := t.TryGetLocalCacheItem(key)
// if nil == localCacheItem {
// t.logger.Debug().Msgf("++%s doesn't exist yet,add a new one", key)
// localCacheItem = NewLocalCacheItem(wireMsg, msgHash)
// t.updateLocalUnconfirmedMessages(key, localCacheItem)
// } else {
// // this means we received the broadcast confirm message from other party first
// t.logger.Debug().Msgf("==%s exist", key)
// if localCacheItem.Msg == nil {
// t.logger.Debug().Msgf("==%s exist, set message", key)
// localCacheItem.Msg = wireMsg
// localCacheItem.Hash = msgHash
// }
// }
// localCacheItem.UpdateConfirmList(t.localPeerID, msgHash)
// if localCacheItem.TotalConfirmParty() == (len(partyInfo.PartyIDMap) - 1) {
// if err := t.updateLocal(localCacheItem.Msg); nil != err {
// return fmt.Errorf("fail to update the message to local party: %w", err)
// }
// }
// buf, err := json.Marshal(broadcastConfirmMsg)
// if err != nil {
// return fmt.Errorf("fail to marshal borad cast confirm message: %w", err)
// }
// t.logger.Debug().Msg("broadcast VerMsg to all other parties")
// peerIDs := t.P2PPeers
// if len(peerIDs) == 0 {
// t.logger.Error().Err(err).Msg("fail to get any peer ID")
// return errors.New("fail to get any peer ID")
// }
//
// p2prappedMSg := p2p.WrappedMessage{
// MessageType: getBroadcastMessageType(msgType),
// MsgID: t.msgID,
// Payload: buf,
// }
//
// t.renderToP2P(&p2p.BroadcastMsgChan{
// WrappedMessage: p2prappedMSg,
// PeersID: peerIDs,
// })
// return nil
// }
//
// func getBroadcastMessageType(msgType p2p.THORChainTSSMessageType) p2p.THORChainTSSMessageType {
// switch msgType {
// case p2p.TSSKeyGenMsg:
// return p2p.TSSKeyGenVerMsg
// case p2p.TSSKeySignMsg:
// return p2p.TSSKeySignVerMsg
// default:
// return p2p.Unknown // this should not happen
// }
// }
//
// func (t *TssCommon) TryGetLocalCacheItem(key string) *LocalCacheItem {
// t.unConfirmedMsgLock.Lock()
// defer t.unConfirmedMsgLock.Unlock()
// localCacheItem, ok := t.unConfirmedMessages[key]
// if !ok {
// return nil
// }
// return localCacheItem
// }
//
// func (t *TssCommon) TryGetAllLocalCached() []*LocalCacheItem {
// var localCachedItems []*LocalCacheItem
// t.unConfirmedMsgLock.Lock()
// defer t.unConfirmedMsgLock.Unlock()
// for _, value := range t.unConfirmedMessages {
// localCachedItems = append(localCachedItems, value)
// }
// return localCachedItems
// }
//
// func (t *TssCommon) updateLocalUnconfirmedMessages(key string, cacheItem *LocalCacheItem) {
// t.unConfirmedMsgLock.Lock()
// defer t.unConfirmedMsgLock.Unlock()
// t.unConfirmedMessages[key] = cacheItem
// }
//
// func (t *TssCommon) removeKey(key string) {
// t.unConfirmedMsgLock.Lock()
// defer t.unConfirmedMsgLock.Unlock()
// delete(t.unConfirmedMessages, key)
// }
func GetTssPubKey(pubKeyPoint *crypto.ECPoint) (string, types.AccAddress, error) {
if pubKeyPoint == nil {
return "", types.AccAddress{}, errors.New("invalid points")
......
......@@ -9,7 +9,6 @@ import (
)
const (
BlameHashCheck = "hash check failed"
BlameTssTimeout = "Tss timeout"
)
......
......@@ -106,7 +106,8 @@ func (kg *TssKeyGen) onMessageValidated(msg *p2p.WireMessage) {
Msg("message validated")
if _, err := pi.Party.UpdateFromBytes(msg.Message, msg.Routing.From, msg.Routing.IsBroadcast); err != nil {
kg.logger.Error().Err(err).Msg("fail to update local party")
// get who to blame
// TODO get who to blame
}
}
......@@ -149,8 +150,7 @@ func (kg *TssKeyGen) GenerateNewKey(keygenReq Request, messageID string) (*crypt
}
}()
r, err := kg.processKeyGen(errChan, outCh, endCh, keyGenLocalStateItem, messageID)
return r, err
return kg.processKeyGen(errChan, outCh, endCh, keyGenLocalStateItem, messageID)
}
func (kg *TssKeyGen) setPartyInfo(partyInfo *common.PartyInfo) {
......@@ -183,10 +183,10 @@ func (kg *TssKeyGen) processKeyGen(errChan chan struct{},
return nil, errors.New("keygen party fail to start")
case <-kg.stopChan: // when TSS processor receive signal to quit
return nil, errors.New("received exit signal")
case <-time.After(keyGenTimeout):
kg.logger.Error().Msgf("fail to generate message with %s", keyGenTimeout)
// be aware this timeout only means how long we wait for the next message to be emit by the local party.
// this is not the overall keygen timeout
kg.logger.Error().Msgf("fail to generate key within %s", keyGenTimeout)
return nil, common.ErrTssTimeOut
case msg := <-outCh:
......
......@@ -5,14 +5,14 @@ import (
"github.com/libp2p/go-libp2p-core/peer"
)
// MessageBox
type MessageBox interface {
// MailBox
type MailBox interface {
AddMessage(messageID string, msg *WireMessage, remotePeer peer.ID)
RemoveMessage(messageID string)
GetMessages(messageID string) []*CachedMessage
}
type MessageBoxImp struct {
type MailBoxImp struct {
cache *lru.Cache
}
......@@ -21,17 +21,17 @@ type CachedMessage struct {
Message *WireMessage
}
// NewMessageBoxImp create a new MessageBoxImp
func NewMessageBoxImp() (*MessageBoxImp, error) {
// NewMailBoxImp create a new MailBoxImp
func NewMailBoxImp() (*MailBoxImp, error) {
cache, err := lru.New(128)
if err != nil {
return nil, err
}
return &MessageBoxImp{cache: cache}, nil
return &MailBoxImp{cache: cache}, nil
}
// AddMessage will add a message into the mail box
func (m *MessageBoxImp) AddMessage(messageID string, msg *WireMessage, remotePeer peer.ID) {
func (m *MailBoxImp) AddMessage(messageID string, msg *WireMessage, remotePeer peer.ID) {
// in a real scenario , we might not get multiple messages, but it might happen
c, ok := m.cache.Get(messageID)
if !ok {
......@@ -54,12 +54,12 @@ func (m *MessageBoxImp) AddMessage(messageID string, msg *WireMessage, remotePee
}
// RemoveMessage remove the given message from mailbox
func (m *MessageBoxImp) RemoveMessage(messageID string) {
func (m *MailBoxImp) RemoveMessage(messageID string) {
m.cache.Remove(messageID)
}
// GetMessages return a slice of cached messages
func (m *MessageBoxImp) GetMessages(messageID string) []*CachedMessage {
func (m *MailBoxImp) GetMessages(messageID string) []*CachedMessage {
if !m.cache.Contains(messageID) {
return nil
}
......
......@@ -32,7 +32,7 @@ type MessageValidator struct {
lock *sync.Mutex
cache map[string]*StandbyMessage
onMessageConfirmedCallback MessageConfirmedHandler
messageBox MessageBox
messageBox MailBox
callbackLock *sync.Mutex
}
......@@ -157,7 +157,7 @@ func (mv *MessageValidator) fireCallback(sm *StandbyMessage, key string) {
}
}
// Park a message into MessageBox usually it means the local party is not ready
// Park a message into MailBox usually it means the local party is not ready
func (mv *MessageValidator) Park(msg *WireMessage, remotePeer peer.ID) {
mv.messageBox.AddMessage(msg.MessageID, msg, remotePeer)
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment