From e4690cd66cef32f10249ddbde405f033a5069ffe Mon Sep 17 00:00:00 2001 From: Ursa <ursa@ninerealms.co> Date: Tue, 23 Apr 2024 09:45:45 -0400 Subject: [PATCH] [cleanup] Remove Non-Pipelined Signer Logic --- bifrost/signer/sign.go | 131 +------------------------------- build/docker/docker-compose.yml | 2 - 2 files changed, 2 insertions(+), 131 deletions(-) diff --git a/bifrost/signer/sign.go b/bifrost/signer/sign.go index fb10bb8441..9acbb1a9e8 100644 --- a/bifrost/signer/sign.go +++ b/bifrost/signer/sign.go @@ -209,29 +209,9 @@ func (s *Signer) processTransactions() { return } - // TODO: this forces new behavior in mocknet, remove after v1 logic is deprecated + // default to 10 if unset if signerConcurrency <= 0 { - signerConcurrencyEnv := os.Getenv("BIFROST_SIGNER_CONCURRENCY") - if signerConcurrencyEnv != "" { - signerConcurrency, err = strconv.ParseInt(signerConcurrencyEnv, 10, 64) - if err != nil { - s.logger.Error().Err(err).Msg("fail to parse BIFROST_SIGNER_CONCURRENCY") - return - } - } - } - - // if unset use v1 logic - if signerConcurrency <= 0 { - - // if previously set, wait for any running signings - if s.pipeline != nil { - s.pipeline.Wait() - s.pipeline = nil - } - - s.processTransactionsV1() - return + signerConcurrency = 10 } // if previously set to different concurrency, drain existing signings @@ -253,113 +233,6 @@ func (s *Signer) processTransactions() { s.pipeline.SpawnSignings(s, s.thorchainBridge) } -func (s *Signer) processTransactionsV1() { - wg := &sync.WaitGroup{} - for _, items := range s.storage.OrderedLists() { - wg.Add(1) - - go func(items []TxOutStoreItem) { - chain := items[0].TxOutItem.Chain // all items in a batch should be the same chain - - defer wg.Done() - - // precondition: all transactions should be for the same chain - for _, item := range items { - if !item.TxOutItem.Chain.Equals(chain) { - s.logger.Error().Msgf("tx out items for different chains in the same batch: %s, %s", item.TxOutItem.Chain, items[0].TxOutItem.Chain) - return - } - } - - // if any tx out items are in broadcast or round 7 failure retry, only proceed with those - retryItems := []TxOutStoreItem{} - for _, item := range items { - if item.Round7Retry || len(item.SignedTx) > 0 { - retryItems = append(retryItems, item) - } - } - if len(retryItems) > 0 { - s.logger.Info().Msgf("found %d retry items", len(retryItems)) - items = retryItems - } - if len(retryItems) > 1 { - s.logger.Error().Msgf("found %d retry items, there should only be one", len(retryItems)) - } - - for i, item := range items { - select { - case <-s.stopChan: - return - default: - if item.Status == TxSpent { // don't rebroadcast spent transactions - continue - } - - s.logger.Info(). - Int("num", i). - Int64("height", item.Height). - Int("status", int(item.Status)). - Interface("tx", item.TxOutItem). - Msg("Signing transaction") - - // a single keysign should not take longer than 5 minutes , regardless TSS or local - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) - checkpoint, obs, err := runWithContext(ctx, func() ([]byte, *types.TxInItem, error) { - return s.signAndBroadcast(item) - }) - if err != nil { - // mark the txout on round 7 failure to block other txs for the chain / pubkey - ksErr := tss.KeysignError{} - if errors.As(err, &ksErr) && ksErr.IsRound7() { - s.logger.Error().Err(err).Interface("tx", item.TxOutItem).Msg("round 7 signing error") - item.Round7Retry = true - item.Checkpoint = checkpoint - if storeErr := s.storage.Set(item); storeErr != nil { - s.logger.Error().Err(storeErr).Msg("fail to update tx out store item with round 7 retry") - } - } - - if errors.Is(err, context.DeadlineExceeded) { - panic(fmt.Errorf("tx out item: %+v , keysign timeout : %w", item.TxOutItem, err)) - } - s.logger.Error().Err(err).Msg("fail to sign and broadcast tx out store item") - cancel() - return - // The 'item' for loop should not be items[0], - // because problems which return 'nil, nil' should be skipped over instead of blocking others. - // When signAndBroadcast returns an error (such as from a keysign timeout), - // a 'return' and not a 'continue' should be used so that nodes can all restart the list, - // for when the keysign failure was from a loss of list synchrony. - // Otherwise, out-of-sync lists would cycle one timeout at a time, maybe never resynchronising. - } - cancel() - - // if enabled and the observation is non-nil, instant-observe without waiting for end of batch's range - // (aim to instant-observe SetSigned prior to any failed transaction RemoveSigned) - if s.cfg.AutoObserve && obs != nil { - s.observer.ObserveSigned(types.TxIn{ - Count: "1", - Chain: chain, - TxArray: []types.TxInItem{*obs}, - MemPool: true, - Filtered: true, - SentUnFinalised: false, - Finalised: false, - ConfirmationRequired: 0, - }, chain.IsEVM()) // Instant EVM observations have wrong gas and need future correct observations - } - - // We have a successful broadcast! Remove the item from our store - if err = s.storage.Remove(item); err != nil { - s.logger.Error().Err(err).Msg("fail to update tx out store item") - } - } - } - }(items) - } - wg.Wait() -} - // processTxnOut processes outbound TxOuts and save them to storage func (s *Signer) processTxnOut(ch <-chan types.TxOut, idx int) { s.logger.Info().Int("idx", idx).Msg("start to process tx out") diff --git a/build/docker/docker-compose.yml b/build/docker/docker-compose.yml index 6bb68a25f4..5ce469df05 100644 --- a/build/docker/docker-compose.yml +++ b/build/docker/docker-compose.yml @@ -179,8 +179,6 @@ services: BIFROST_CHAINS_AVAX_BLOCK_SCANNER_WHITELIST_TOKENS: "0x52C84043CD9c865236f11d9Fc9F56aa003c1f922,0xB97EF9Ef8734C71904D8002F8b6Bc66Dd9c48a6E" BIFROST_CHAINS_BSC_BLOCK_SCANNER_WHITELIST_TOKENS: "0x52C84043CD9c865236f11d9Fc9F56aa003c1f922,0x8AC76a51cc950d9822D68b83fE1Ad97B32Cd580d" - BIFROST_SIGNER_CONCURRENCY: "10" - ports: - 5040:5040 - 6040:6040 -- GitLab