Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • hackjealousy/thornode
  • thorchain/thornode
  • lpfloyd/thornode
  • balder7/thornode
  • Gaiieg/thornode
  • vivek.vardhan7/thornode
  • yairi.medinac/thornode
  • alessio/thornode
  • stjordanis/thornode
  • jtakalai1/thornode
  • ggulgun/thornode
  • pascaldekloe/thornode
  • n.huskisson1992/thornode
  • horacio.mlequo/thornode
  • blockx-labs/thornode
  • silverbackgodx/thornode
  • aleksbez/thornode
  • kushptl/thornode
  • difordcrypt/thornode
  • 5thdimension/thornode
  • edgarmanuelruizplasticos/thornode
  • DevLopME-Az/thornode
  • aswizzle/thornode
  • kiasaki/thornode
  • zhangliang041/thornode
  • cartersz/thornode
  • vikingshield/thornode
  • crzyazblue1/thornode
  • vkbdev/thornode
  • nathanaafo/thornode
  • bi23com_guard/thornode
  • crux25/thornode
  • TheArchitect108/thornode
  • zby121103/thornode
  • the_eridanus/thornode
  • halley9r/thornode
  • pendergrassjohn288/thornode
  • faisal1389/thornode
  • alexdcox/thornode
  • huginntc/thornode
  • guidovranken/thornode
  • thorCatStevens/thornode
  • mogarchy/thornode
  • blackprotocol/blacknode
  • pranav292gpt/thornode
  • dp49/thornode
  • Bevan96230395/thornode
  • JonathanLorimer/thornode
  • akil27/thornode
  • assafmo/thornode
  • Multipartite/thornode
  • george_s/thornode
  • thehuman/thornode
  • 0x-General/thornode
  • kaladinlight/thornode
  • oliver154/thornode
  • TreefeedXavier/thornode
  • veado/thornode
  • HooriRn/thornode
  • PaperNautilus/thornode
  • scorchedfire/thornode
  • KevinXdefi/thornode
  • canziclark24/thornode
  • GiMa-Maya/blacknode
  • ajabhishek759/thornode
  • akincibor/thornode
  • digitaldollarchain/digital-dollar-node
  • rufus.t.firefly/thornode
  • lends/thornode
  • mohs1n/thornode
  • SaifulJnU/thornode
  • SamusElderg/thornode
  • Hippocampus.web3/thornode
  • TxCorpi0x/thornode
  • ursa9r/thornode
  • mdanny1209/thornode
  • OxQuasar/thornode
  • justinvforvendetta/thornode
  • pluto_x1/thornode
  • cryptobuks/thornode
  • samyap/thornode
  • AsmundTHORSec/thornode
  • jiecut42/thornode
  • fishtail6993/thornode
  • koitsu/thornode
  • TheRagnarLodbrok/thornode
  • leonalistingservice/thornode
  • cosminl/thornode
  • zlyzol/thornode
  • inkthorchain/thornode
  • dyns/thornode
  • OKEAMAH/thornode
  • kirtixs/thornode
  • asamere/thornode
  • codehans/thornode
  • markfromdenmark/thornode
  • starsquid/thornode
  • danbryan1/thornode
  • jonreiter/thornode
  • beorn_9r/thornode
  • ahdzib/thornode
  • aper.cu/thornode
  • rekt0x/thornode
  • pharr117/thornode
  • gima-swapkit/thornode
  • proof.of.steve/thornode
  • proof.of.steve/thor-node-2
  • mayachain/thorchain/thornode
  • kocubinski/thornode
  • mattshields/thorchain/thornode
110 results
Show changes
Commits on Source (8)
Showing
with 1717 additions and 110 deletions
...@@ -145,19 +145,21 @@ format: ...@@ -145,19 +145,21 @@ format:
lint: lint:
@./scripts/lint.sh @./scripts/lint.sh
@go run tools/analyze/main.go ./common/... ./constants/... ./x/... ./mimir/... @go run tools/analyze/main.go ./common/... ./constants/... ./x/... ./mimir/...
@go run tools/lint-whitelist-tokens/main.go
@./scripts/trunk check --no-fix --upstream origin/develop @./scripts/trunk check --no-fix --upstream origin/develop
lint-ci: lint-ci:
@./scripts/lint.sh @./scripts/lint.sh
@go run tools/analyze/main.go ./common/... ./constants/... ./x/... ./mimir/... @go run tools/analyze/main.go ./common/... ./constants/... ./x/... ./mimir/...
@go run tools/lint-whitelist-tokens/main.go
@./scripts/lint-versions.bash
@./scripts/lint-mimir-ids.bash
ifdef CI_MERGE_REQUEST_ID ifdef CI_MERGE_REQUEST_ID
# only check changes on merge requests # only check changes on merge requests
@./scripts/trunk check --ci -j8 --upstream FETCH_HEAD @./scripts/trunk check --ci -j8 --upstream FETCH_HEAD
else else
@./scripts/trunk check --all --ci -j8 @./scripts/trunk check --all --ci -j8
endif endif
@./scripts/lint-versions.bash
@./scripts/lint-mimir-ids.bash
# ------------------------------ Testing ------------------------------ # ------------------------------ Testing ------------------------------
......
...@@ -40,11 +40,11 @@ import ( ...@@ -40,11 +40,11 @@ import (
"gitlab.com/thorchain/thornode/common/cosmos" "gitlab.com/thorchain/thornode/common/cosmos"
"gitlab.com/thorchain/thornode/config" "gitlab.com/thorchain/thornode/config"
"gitlab.com/thorchain/thornode/constants" "gitlab.com/thorchain/thornode/constants"
"gitlab.com/thorchain/thornode/x/thorchain/aggregators"
mem "gitlab.com/thorchain/thornode/x/thorchain/memo" mem "gitlab.com/thorchain/thornode/x/thorchain/memo"
) )
const ( const (
maxGasLimit = 800000
ethBlockRewardAndFee = 3 * 1e18 ethBlockRewardAndFee = 3 * 1e18
) )
...@@ -512,7 +512,7 @@ func (c *Client) SignTx(tx stypes.TxOutItem, height int64) ([]byte, []byte, *sty ...@@ -512,7 +512,7 @@ func (c *Client) SignTx(tx stypes.TxOutItem, height int64) ([]byte, []byte, *sty
// as long as we pass in an ETH value , which we almost guarantee it will not exceed the ETH balance , so we can avoid the above two errors // as long as we pass in an ETH value , which we almost guarantee it will not exceed the ETH balance , so we can avoid the above two errors
estimatedETHValue = estimatedETHValue.SetInt64(21000) estimatedETHValue = estimatedETHValue.SetInt64(21000)
} }
createdTx := etypes.NewTransaction(nonce, ecommon.HexToAddress(contractAddr.String()), estimatedETHValue, MaxContractGas, gasRate, data) createdTx := etypes.NewTransaction(nonce, ecommon.HexToAddress(contractAddr.String()), estimatedETHValue, c.cfg.BlockScanner.MaxGasLimit, gasRate, data)
estimatedGas, err := c.estimateGas(fromAddr.String(), createdTx) estimatedGas, err := c.estimateGas(fromAddr.String(), createdTx)
if err != nil { if err != nil {
// in an edge case that vault doesn't have enough fund to fulfill an outbound transaction , it will fail to estimate gas // in an edge case that vault doesn't have enough fund to fulfill an outbound transaction , it will fail to estimate gas
...@@ -524,44 +524,58 @@ func (c *Client) SignTx(tx stypes.TxOutItem, height int64) ([]byte, []byte, *sty ...@@ -524,44 +524,58 @@ func (c *Client) SignTx(tx stypes.TxOutItem, height int64) ([]byte, []byte, *sty
} }
c.logger.Info().Msgf("memo:%s estimated gas unit: %d", tx.Memo, estimatedGas) c.logger.Info().Msgf("memo:%s estimated gas unit: %d", tx.Memo, estimatedGas)
gasOut := big.NewInt(0) scheduledMaxFee := big.NewInt(0)
for _, coin := range tx.MaxGas { for _, coin := range tx.MaxGas {
gasOut.Add(gasOut, c.convertThorchainAmountToWei(coin.Amount.BigInt())) scheduledMaxFee.Add(scheduledMaxFee, c.convertThorchainAmountToWei(coin.Amount.BigInt()))
} }
totalGas := big.NewInt(int64(estimatedGas) * gasRate.Int64())
if ethValue.Uint64() > 0 { if tx.Aggregator != "" {
if tx.Aggregator != "" { var gasLimitForAggregator uint64
// At this point, if this is is to an aggregator (which should be white-listed), allow the maximum gas. gasLimitForAggregator, err = aggregators.FetchDexAggregatorGasLimit(
if estimatedGas > maxGasLimit { common.LatestVersion, c.cfg.ChainID, tx.Aggregator,
// the estimated gas unit is more than the maximum , so bring down the gas rate )
maxGasWei := big.NewInt(1).Mul(big.NewInt(maxGasLimit), gasRate) if err != nil {
gasRate = big.NewInt(1).Div(maxGasWei, big.NewInt(int64(estimatedGas))) c.logger.Err(err).
} else { Str("aggregator", tx.Aggregator).
estimatedGas = maxGasLimit // pay the maximum Msg("fail to get aggregator gas limit, aborting to let thornode reschdule")
} return nil, nil, nil, nil
} else {
// when the estimated gas is larger than the MaxGas that is allowed to be used
// adjust the gas price to reflect that , so not breach the MaxGas restriction
// This might cause the tx to delay
if totalGas.Cmp(gasOut) == 1 {
gasRate = gasOut.Div(gasOut, big.NewInt(int64(estimatedGas)))
c.logger.Info().Msgf("based on estimated gas unit (%d) , total gas will be %s, which is more than %s, so adjust gas rate to %s", estimatedGas, totalGas.String(), gasOut.String(), gasRate.String())
} else {
// override estimate gas with the max
estimatedGas = big.NewInt(0).Div(gasOut, gasRate).Uint64()
c.logger.Info().Msgf("transaction with memo %s can spend up to %d gas unit, gasRate:%s", tx.Memo, estimatedGas, gasRate)
}
} }
createdTx = etypes.NewTransaction(nonce, ecommon.HexToAddress(contractAddr.String()), ethValue, estimatedGas, gasRate, data)
} else { // if the estimate gas is over the max, abort and let thornode reschedule for now
if estimatedGas > maxGasLimit { if estimatedGas > gasLimitForAggregator {
// the estimated gas unit is more than the maximum , so bring down the gas rate c.logger.Warn().
maxGasWei := big.NewInt(1).Mul(big.NewInt(maxGasLimit), gasRate) Stringer("in_hash", tx.InHash).
gasRate = big.NewInt(1).Div(maxGasWei, big.NewInt(int64(estimatedGas))) Uint64("estimated_gas", estimatedGas).
Uint64("aggregator_gas_limit", gasLimitForAggregator).
Msg("aggregator gas limit exceeded, aborting to let thornode reschedule")
return nil, nil, nil, nil
} }
createdTx = etypes.NewTransaction(nonce, ecommon.HexToAddress(contractAddr.String()), ethValue, estimatedGas, gasRate, data)
// set limit to aggregator gas limit
estimatedGas = gasLimitForAggregator
// aggregator swap outs currently ignore max gas, but abort if 10x over for safety
//
// TODO: Update thornode to take aggregator gas limit into consideration and set a
// max gas that should be respected.
scheduledMaxFee = scheduledMaxFee.Mul(scheduledMaxFee, big.NewInt(10))
} }
// if over max scheduled gas, abort and let thornode reschedule
estimatedFee := big.NewInt(int64(estimatedGas) * gasRate.Int64())
if scheduledMaxFee.Cmp(estimatedFee) < 0 {
c.logger.Warn().
Stringer("in_hash", tx.InHash).
Str("estimated_fee", estimatedFee.String()).
Str("scheduled_max_fee", scheduledMaxFee.String()).
Msg("max gas exceeded, aborting to let thornode reschedule")
return nil, nil, nil, nil
}
createdTx = etypes.NewTransaction(
nonce, ecommon.HexToAddress(contractAddr.String()), ethValue, estimatedGas, gasRate, data,
)
rawTx, err := c.sign(createdTx, tx.VaultPubKey, height, tx) rawTx, err := c.sign(createdTx, tx.VaultPubKey, height, tx)
if err != nil || len(rawTx) == 0 { if err != nil || len(rawTx) == 0 {
return nil, nonceBytes, nil, fmt.Errorf("fail to sign message: %w", err) return nil, nonceBytes, nil, fmt.Errorf("fail to sign message: %w", err)
...@@ -917,7 +931,7 @@ func (c *Client) ReportSolvency(ethBlockHeight int64) error { ...@@ -917,7 +931,7 @@ func (c *Client) ReportSolvency(ethBlockHeight int64) error {
c.logger.Err(err).Msgf("fail to get account balance") c.logger.Err(err).Msgf("fail to get account balance")
continue continue
} }
if runners.IsVaultSolvent(acct, asgard, cosmos.NewUint(3*MaxContractGas*c.ethScanner.lastReportedGasPrice)) && c.IsBlockScannerHealthy() { if runners.IsVaultSolvent(acct, asgard, cosmos.NewUint(3*c.cfg.BlockScanner.MaxGasLimit*c.ethScanner.lastReportedGasPrice)) && c.IsBlockScannerHealthy() {
// when vault is solvent , don't need to report solvency // when vault is solvent , don't need to report solvency
// when block scanner is not healthy , usually that means the chain is halted , in that scenario , we continue to report solvency // when block scanner is not healthy , usually that means the chain is halted , in that scenario , we continue to report solvency
continue continue
......
...@@ -44,7 +44,6 @@ type SolvencyReporter func(int64) error ...@@ -44,7 +44,6 @@ type SolvencyReporter func(int64) error
const ( const (
BlockCacheSize = 6000 BlockCacheSize = 6000
MaxContractGas = 80000
ethToken = "0x0000000000000000000000000000000000000000" ethToken = "0x0000000000000000000000000000000000000000"
symbolMethod = "symbol" symbolMethod = "symbol"
decimalMethod = "decimals" decimalMethod = "decimals"
...@@ -219,7 +218,7 @@ func (e *ETHScanner) FetchTxs(height, chainHeight int64) (stypes.TxIn, error) { ...@@ -219,7 +218,7 @@ func (e *ETHScanner) FetchTxs(height, chainHeight int64) (stypes.TxIn, error) {
// post to thorchain if there is a fee and it has changed // post to thorchain if there is a fee and it has changed
if gasPrice.Cmp(big.NewInt(0)) != 0 && tcGasPrice != e.lastReportedGasPrice { if gasPrice.Cmp(big.NewInt(0)) != 0 && tcGasPrice != e.lastReportedGasPrice {
if _, err = e.bridge.PostNetworkFee(height, common.ETHChain, MaxContractGas, tcGasPrice); err != nil { if _, err = e.bridge.PostNetworkFee(height, common.ETHChain, e.cfg.MaxGasLimit, tcGasPrice); err != nil {
e.logger.Err(err).Msg("fail to post ETH chain single transfer fee to THORNode") e.logger.Err(err).Msg("fail to post ETH chain single transfer fee to THORNode")
} else { } else {
e.lastReportedGasPrice = tcGasPrice e.lastReportedGasPrice = tcGasPrice
......
...@@ -333,7 +333,7 @@ func (s *EthereumSuite) TestClient(c *C) { ...@@ -333,7 +333,7 @@ func (s *EthereumSuite) TestClient(c *C) {
"max_gas": [ "max_gas": [
{ {
"asset": "ETH.ETH", "asset": "ETH.ETH",
"amount": "300000" "amount": "600000"
} }
], ],
"gas_rate":1 "gas_rate":1
...@@ -394,6 +394,7 @@ func (s *EthereumSuite) TestSignETHTx(c *C) { ...@@ -394,6 +394,7 @@ func (s *EthereumSuite) TestSignETHTx(c *C) {
BlockScanner: config.BifrostBlockScannerConfiguration{ BlockScanner: config.BifrostBlockScannerConfiguration{
StartBlockHeight: 1, // avoids querying thorchain for block height StartBlockHeight: 1, // avoids querying thorchain for block height
HTTPRequestTimeout: time.Second, HTTPRequestTimeout: time.Second,
MaxGasLimit: 80000,
}, },
}, nil, s.bridge, s.m, pubkeyMgr, poolMgr) }, nil, s.bridge, s.m, pubkeyMgr, poolMgr)
c.Assert(err, IsNil) c.Assert(err, IsNil)
...@@ -474,7 +475,7 @@ func (s *EthereumSuite) TestSignETHTx(c *C) { ...@@ -474,7 +475,7 @@ func (s *EthereumSuite) TestSignETHTx(c *C) {
common.NewCoin(common.ETHAsset, cosmos.NewUint(1e18)), common.NewCoin(common.ETHAsset, cosmos.NewUint(1e18)),
}, },
MaxGas: common.Gas{ MaxGas: common.Gas{
common.NewCoin(common.ETHAsset, cosmos.NewUint(MaxContractGas)), common.NewCoin(common.ETHAsset, cosmos.NewUint(e.cfg.BlockScanner.MaxGasLimit*8)),
}, },
GasRate: 1, GasRate: 1,
Memo: "OUT:4D91ADAFA69765E7805B5FF2F3A0BA1DBE69E37A1CFCD20C48B99C528AA3EE87", Memo: "OUT:4D91ADAFA69765E7805B5FF2F3A0BA1DBE69E37A1CFCD20C48B99C528AA3EE87",
...@@ -498,7 +499,7 @@ func (s *EthereumSuite) TestSignETHTx(c *C) { ...@@ -498,7 +499,7 @@ func (s *EthereumSuite) TestSignETHTx(c *C) {
common.NewCoin(asset, cosmos.NewUint(1e18)), common.NewCoin(asset, cosmos.NewUint(1e18)),
}, },
MaxGas: common.Gas{ MaxGas: common.Gas{
common.NewCoin(common.ETHAsset, cosmos.NewUint(MaxContractGas)), common.NewCoin(common.ETHAsset, cosmos.NewUint(e.cfg.BlockScanner.MaxGasLimit*8)),
}, },
GasRate: 1, GasRate: 1,
Memo: "OUT:4D91ADAFA69765E7805B5FF2F3A0BA1DBE69E37A1CFCD20C48B99C528AA3EE87", Memo: "OUT:4D91ADAFA69765E7805B5FF2F3A0BA1DBE69E37A1CFCD20C48B99C528AA3EE87",
...@@ -519,7 +520,7 @@ func (s *EthereumSuite) TestSignETHTx(c *C) { ...@@ -519,7 +520,7 @@ func (s *EthereumSuite) TestSignETHTx(c *C) {
common.NewCoin(common.ETHAsset, cosmos.NewUint(1e18)), common.NewCoin(common.ETHAsset, cosmos.NewUint(1e18)),
}, },
MaxGas: common.Gas{ MaxGas: common.Gas{
common.NewCoin(common.ETHAsset, cosmos.NewUint(MaxContractGas)), common.NewCoin(common.ETHAsset, cosmos.NewUint(e.cfg.BlockScanner.MaxGasLimit*8)),
}, },
GasRate: 1, GasRate: 1,
Memo: "REFUND:4D91ADAFA69765E7805B5FF2F3A0BA1DBE69E37A1CFCD20C48B99C528AA3EE87", Memo: "REFUND:4D91ADAFA69765E7805B5FF2F3A0BA1DBE69E37A1CFCD20C48B99C528AA3EE87",
...@@ -540,7 +541,7 @@ func (s *EthereumSuite) TestSignETHTx(c *C) { ...@@ -540,7 +541,7 @@ func (s *EthereumSuite) TestSignETHTx(c *C) {
common.NewCoin(asset, cosmos.NewUint(1e18)), common.NewCoin(asset, cosmos.NewUint(1e18)),
}, },
MaxGas: common.Gas{ MaxGas: common.Gas{
common.NewCoin(common.ETHAsset, cosmos.NewUint(MaxContractGas)), common.NewCoin(common.ETHAsset, cosmos.NewUint(e.cfg.BlockScanner.MaxGasLimit*8)),
}, },
GasRate: 1, GasRate: 1,
Memo: "OUT:4D91ADAFA69765E7805B5FF2F3A0BA1DBE69E37A1CFCD20C48B99C528AA3EE87", Memo: "OUT:4D91ADAFA69765E7805B5FF2F3A0BA1DBE69E37A1CFCD20C48B99C528AA3EE87",
...@@ -561,7 +562,7 @@ func (s *EthereumSuite) TestSignETHTx(c *C) { ...@@ -561,7 +562,7 @@ func (s *EthereumSuite) TestSignETHTx(c *C) {
common.NewCoin(common.ETHAsset, cosmos.NewUint(1e18)), common.NewCoin(common.ETHAsset, cosmos.NewUint(1e18)),
}, },
MaxGas: common.Gas{ MaxGas: common.Gas{
common.NewCoin(common.ETHAsset, cosmos.NewUint(MaxContractGas)), common.NewCoin(common.ETHAsset, cosmos.NewUint(e.cfg.BlockScanner.MaxGasLimit*8)),
}, },
GasRate: 1, GasRate: 1,
Memo: "MIGRATE:1024", Memo: "MIGRATE:1024",
...@@ -582,7 +583,7 @@ func (s *EthereumSuite) TestSignETHTx(c *C) { ...@@ -582,7 +583,7 @@ func (s *EthereumSuite) TestSignETHTx(c *C) {
common.NewCoin(asset, cosmos.NewUint(1e18)), common.NewCoin(asset, cosmos.NewUint(1e18)),
}, },
MaxGas: common.Gas{ MaxGas: common.Gas{
common.NewCoin(common.ETHAsset, cosmos.NewUint(MaxContractGas)), common.NewCoin(common.ETHAsset, cosmos.NewUint(e.cfg.BlockScanner.MaxGasLimit*8)),
}, },
GasRate: 1, GasRate: 1,
Memo: "MIGRATE:1024", Memo: "MIGRATE:1024",
......
...@@ -156,7 +156,7 @@ func (c *Client) unstuckTx(clog zerolog.Logger, item types.SignedTxItem) error { ...@@ -156,7 +156,7 @@ func (c *Client) unstuckTx(clog zerolog.Logger, item types.SignedTxItem) error {
if inflatedOriginalGasPrice.Cmp(currentGasRate) > 0 { if inflatedOriginalGasPrice.Cmp(currentGasRate) > 0 {
currentGasRate = big.NewInt(1).Mul(originGasPrice, big.NewInt(2)) currentGasRate = big.NewInt(1).Mul(originGasPrice, big.NewInt(2))
} }
canceltx := etypes.NewTransaction(tx.Nonce(), ecommon.HexToAddress(address.String()), big.NewInt(0), MaxContractGas, currentGasRate, nil) canceltx := etypes.NewTransaction(tx.Nonce(), ecommon.HexToAddress(address.String()), big.NewInt(0), c.cfg.BlockScanner.MaxGasLimit, currentGasRate, nil)
rawBytes, err := c.kw.Sign(canceltx, pubKey) rawBytes, err := c.kw.Sign(canceltx, pubKey)
if err != nil { if err != nil {
return fmt.Errorf("fail to sign tx for cancelling with nonce: %d, err: %w", tx.Nonce(), err) return fmt.Errorf("fail to sign tx for cancelling with nonce: %d, err: %w", tx.Nonce(), err)
......
...@@ -33,6 +33,7 @@ import ( ...@@ -33,6 +33,7 @@ import (
"gitlab.com/thorchain/thornode/common/cosmos" "gitlab.com/thorchain/thornode/common/cosmos"
"gitlab.com/thorchain/thornode/config" "gitlab.com/thorchain/thornode/config"
"gitlab.com/thorchain/thornode/constants" "gitlab.com/thorchain/thornode/constants"
"gitlab.com/thorchain/thornode/x/thorchain/aggregators"
mem "gitlab.com/thorchain/thornode/x/thorchain/memo" mem "gitlab.com/thorchain/thornode/x/thorchain/memo"
tssp "gitlab.com/thorchain/tss/go-tss/tss" tssp "gitlab.com/thorchain/tss/go-tss/tss"
) )
...@@ -514,7 +515,7 @@ func (c *EVMClient) buildOutboundTx(txOutItem stypes.TxOutItem, memo mem.Memo, n ...@@ -514,7 +515,7 @@ func (c *EVMClient) buildOutboundTx(txOutItem stypes.TxOutItem, memo mem.Memo, n
// as long as we pass in an EVM value , which we almost guarantee it will not exceed the EVM balance , so we can avoid the above two errors // as long as we pass in an EVM value , which we almost guarantee it will not exceed the EVM balance , so we can avoid the above two errors
estimatedEVMValue = estimatedEVMValue.SetInt64(21000) estimatedEVMValue = estimatedEVMValue.SetInt64(21000)
} }
createdTx := etypes.NewTransaction(nonce, ecommon.HexToAddress(contractAddr.String()), estimatedEVMValue, MaxContractGas, gasRate, txData) createdTx := etypes.NewTransaction(nonce, ecommon.HexToAddress(contractAddr.String()), estimatedEVMValue, c.cfg.BlockScanner.MaxGasLimit, gasRate, txData)
estimatedGas, err := c.evmScanner.ethRpc.EstimateGas(fromAddr.String(), createdTx) estimatedGas, err := c.evmScanner.ethRpc.EstimateGas(fromAddr.String(), createdTx)
if err != nil { if err != nil {
// in an edge case that vault doesn't have enough fund to fulfill an outbound transaction , it will fail to estimate gas // in an edge case that vault doesn't have enough fund to fulfill an outbound transaction , it will fail to estimate gas
...@@ -525,44 +526,58 @@ func (c *EVMClient) buildOutboundTx(txOutItem stypes.TxOutItem, memo mem.Memo, n ...@@ -525,44 +526,58 @@ func (c *EVMClient) buildOutboundTx(txOutItem stypes.TxOutItem, memo mem.Memo, n
return nil, nil return nil, nil
} }
gasOut := big.NewInt(0) scheduledMaxFee := big.NewInt(0)
for _, coin := range txOutItem.MaxGas { for _, coin := range txOutItem.MaxGas {
gasOut.Add(gasOut, convertThorchainAmountToWei(coin.Amount.BigInt())) scheduledMaxFee.Add(scheduledMaxFee, convertThorchainAmountToWei(coin.Amount.BigInt()))
} }
totalGas := big.NewInt(int64(estimatedGas) * gasRate.Int64())
if evmValue.Uint64() > 0 { if txOutItem.Aggregator != "" {
// when the estimated gas is larger than the MaxGas that is allowed to be used var gasLimitForAggregator uint64
// adjust the gas price to reflect that , so not breach the MaxGas restriction gasLimitForAggregator, err = aggregators.FetchDexAggregatorGasLimit(
// This might cause the tx to delay common.LatestVersion, c.cfg.ChainID, txOutItem.Aggregator,
if totalGas.Cmp(gasOut) == 1 { )
// At this point, if this is is to an aggregator (which should be white-listed), allow the maximum gas. if err != nil {
if txOutItem.Aggregator == "" { c.logger.Err(err).
gasRate = gasOut.Div(gasOut, big.NewInt(int64(estimatedGas))) Str("aggregator", txOutItem.Aggregator).
c.logger.Info().Msgf("based on estimated gas unit (%d) , total gas will be %s, which is more than %s, so adjust gas rate to %s", estimatedGas, totalGas.String(), gasOut.String(), gasRate.String()) Msg("fail to get aggregator gas limit, aborting to let thornode reschdule")
} else { return nil, nil
if estimatedGas > uint64(c.cfg.BlockScanner.MaxGasFee) {
// the estimated gas unit is more than the maximum , so bring down the gas rate
maxGasWei := big.NewInt(1).Mul(big.NewInt(c.cfg.BlockScanner.MaxGasFee), gasRate)
gasRate = big.NewInt(1).Div(maxGasWei, big.NewInt(int64(estimatedGas)))
} else {
estimatedGas = uint64(c.cfg.BlockScanner.MaxGasFee) // pay the maximum
}
}
} else {
// override estimate gas with the max
estimatedGas = big.NewInt(0).Div(gasOut, gasRate).Uint64()
c.logger.Info().Str("memo", txOutItem.Memo).Uint64("estimatedGas", estimatedGas).Int64("gasRate", gasRate.Int64()).Msg("override estimate gas with max")
} }
createdTx = etypes.NewTransaction(nonce, ecommon.HexToAddress(contractAddr.String()), evmValue, estimatedGas, gasRate, txData)
} else { // if the estimate gas is over the max, abort and let thornode reschedule for now
if estimatedGas > uint64(c.cfg.BlockScanner.MaxGasFee) { if estimatedGas > gasLimitForAggregator {
// the estimated gas unit is more than the maximum , so bring down the gas rate c.logger.Warn().
maxGasWei := big.NewInt(1).Mul(big.NewInt(c.cfg.BlockScanner.MaxGasFee), gasRate) Stringer("in_hash", txOutItem.InHash).
gasRate = big.NewInt(1).Div(maxGasWei, big.NewInt(int64(estimatedGas))) Uint64("estimated_gas", estimatedGas).
Uint64("aggregator_gas_limit", gasLimitForAggregator).
Msg("swap out gas limit exceeded, aborting to let thornode reschedule")
return nil, nil
} }
createdTx = etypes.NewTransaction(nonce, ecommon.HexToAddress(contractAddr.String()), evmValue, estimatedGas, gasRate, txData)
// set limit to aggregator gas limit
estimatedGas = gasLimitForAggregator
// aggregator swap outs currently ignore max gas, but abort if 10x over for safety
//
// TODO: Update thornode to take aggregator gas limit into consideration and set a
// max gas that should be respected.
scheduledMaxFee = scheduledMaxFee.Mul(scheduledMaxFee, big.NewInt(10))
} }
// if over max scheduled gas, abort and let thornode reschedule
estimatedFee := big.NewInt(int64(estimatedGas) * gasRate.Int64())
if scheduledMaxFee.Cmp(estimatedFee) < 0 {
c.logger.Warn().
Stringer("in_hash", txOutItem.InHash).
Str("estimated_fee", estimatedFee.String()).
Str("scheduled_max_fee", scheduledMaxFee.String()).
Msg("max gas exceeded, aborting to let thornode reschedule")
return nil, nil
}
createdTx = etypes.NewTransaction(
nonce, ecommon.HexToAddress(contractAddr.String()), evmValue, estimatedGas, gasRate, txData,
)
return createdTx, nil return createdTx, nil
} }
...@@ -645,6 +660,11 @@ func (c *EVMClient) SignTx(tx stypes.TxOutItem, height int64) ([]byte, []byte, * ...@@ -645,6 +660,11 @@ func (c *EVMClient) SignTx(tx stypes.TxOutItem, height int64) ([]byte, []byte, *
return nil, nil, nil, err return nil, nil, nil, err
} }
// if transaction is nil, abort to allow thornode reschedule
if outboundTx == nil {
return nil, nil, nil, nil
}
rawTx, err := c.sign(outboundTx, tx.VaultPubKey, height, tx) rawTx, err := c.sign(outboundTx, tx.VaultPubKey, height, tx)
if err != nil || len(rawTx) == 0 { if err != nil || len(rawTx) == 0 {
return nil, nonceBytes, nil, fmt.Errorf("fail to sign message: %w", err) return nil, nonceBytes, nil, fmt.Errorf("fail to sign message: %w", err)
...@@ -809,7 +829,7 @@ func (c *EVMClient) ReportSolvency(height int64) error { ...@@ -809,7 +829,7 @@ func (c *EVMClient) ReportSolvency(height int64) error {
return fmt.Errorf("fail to get asgards, err: %w", err) return fmt.Errorf("fail to get asgards, err: %w", err)
} }
currentGasFee := cosmos.NewUint(3 * MaxContractGas * c.evmScanner.lastReportedGasPrice) currentGasFee := cosmos.NewUint(3 * c.cfg.BlockScanner.MaxGasLimit * c.evmScanner.lastReportedGasPrice)
for _, asgard := range asgardVaults { for _, asgard := range asgardVaults {
var acct common.Account var acct common.Account
......
...@@ -415,6 +415,7 @@ func (s *EVMSuite) TestSignEVMTx(c *C) { ...@@ -415,6 +415,7 @@ func (s *EVMSuite) TestSignEVMTx(c *C) {
RPCHost: "http://" + s.server.Listener.Addr().String(), RPCHost: "http://" + s.server.Listener.Addr().String(),
StartBlockHeight: 1, // avoids querying thorchain for block height StartBlockHeight: 1, // avoids querying thorchain for block height
HTTPRequestTimeout: time.Second, HTTPRequestTimeout: time.Second,
MaxGasLimit: 80000,
}, },
}, nil, s.bridge, s.m, pubkeyMgr, poolMgr) }, nil, s.bridge, s.m, pubkeyMgr, poolMgr)
c.Assert(err, IsNil) c.Assert(err, IsNil)
...@@ -495,7 +496,7 @@ func (s *EVMSuite) TestSignEVMTx(c *C) { ...@@ -495,7 +496,7 @@ func (s *EVMSuite) TestSignEVMTx(c *C) {
common.NewCoin(common.AVAXAsset, cosmos.NewUint(1e18)), common.NewCoin(common.AVAXAsset, cosmos.NewUint(1e18)),
}, },
MaxGas: common.Gas{ MaxGas: common.Gas{
common.NewCoin(common.AVAXAsset, cosmos.NewUint(MaxContractGas)), common.NewCoin(common.AVAXAsset, cosmos.NewUint(e.cfg.BlockScanner.MaxGasLimit*4)),
}, },
GasRate: 1, GasRate: 1,
Memo: "OUT:4D91ADAFA69765E7805B5FF2F3A0BA1DBE69E37A1CFCD20C48B99C528AA3EE87", Memo: "OUT:4D91ADAFA69765E7805B5FF2F3A0BA1DBE69E37A1CFCD20C48B99C528AA3EE87",
...@@ -519,7 +520,7 @@ func (s *EVMSuite) TestSignEVMTx(c *C) { ...@@ -519,7 +520,7 @@ func (s *EVMSuite) TestSignEVMTx(c *C) {
common.NewCoin(asset, cosmos.NewUint(1e18)), common.NewCoin(asset, cosmos.NewUint(1e18)),
}, },
MaxGas: common.Gas{ MaxGas: common.Gas{
common.NewCoin(common.AVAXAsset, cosmos.NewUint(MaxContractGas)), common.NewCoin(common.AVAXAsset, cosmos.NewUint(e.cfg.BlockScanner.MaxGasLimit*4)),
}, },
GasRate: 1, GasRate: 1,
Memo: "OUT:4D91ADAFA69765E7805B5FF2F3A0BA1DBE69E37A1CFCD20C48B99C528AA3EE87", Memo: "OUT:4D91ADAFA69765E7805B5FF2F3A0BA1DBE69E37A1CFCD20C48B99C528AA3EE87",
...@@ -540,7 +541,7 @@ func (s *EVMSuite) TestSignEVMTx(c *C) { ...@@ -540,7 +541,7 @@ func (s *EVMSuite) TestSignEVMTx(c *C) {
common.NewCoin(common.AVAXAsset, cosmos.NewUint(1e18)), common.NewCoin(common.AVAXAsset, cosmos.NewUint(1e18)),
}, },
MaxGas: common.Gas{ MaxGas: common.Gas{
common.NewCoin(common.AVAXAsset, cosmos.NewUint(MaxContractGas)), common.NewCoin(common.AVAXAsset, cosmos.NewUint(e.cfg.BlockScanner.MaxGasLimit*4)),
}, },
GasRate: 1, GasRate: 1,
Memo: "REFUND:4D91ADAFA69765E7805B5FF2F3A0BA1DBE69E37A1CFCD20C48B99C528AA3EE87", Memo: "REFUND:4D91ADAFA69765E7805B5FF2F3A0BA1DBE69E37A1CFCD20C48B99C528AA3EE87",
...@@ -561,7 +562,7 @@ func (s *EVMSuite) TestSignEVMTx(c *C) { ...@@ -561,7 +562,7 @@ func (s *EVMSuite) TestSignEVMTx(c *C) {
common.NewCoin(asset, cosmos.NewUint(1e18)), common.NewCoin(asset, cosmos.NewUint(1e18)),
}, },
MaxGas: common.Gas{ MaxGas: common.Gas{
common.NewCoin(common.AVAXAsset, cosmos.NewUint(MaxContractGas)), common.NewCoin(common.AVAXAsset, cosmos.NewUint(e.cfg.BlockScanner.MaxGasLimit*4)),
}, },
GasRate: 1, GasRate: 1,
Memo: "OUT:4D91ADAFA69765E7805B5FF2F3A0BA1DBE69E37A1CFCD20C48B99C528AA3EE87", Memo: "OUT:4D91ADAFA69765E7805B5FF2F3A0BA1DBE69E37A1CFCD20C48B99C528AA3EE87",
...@@ -582,7 +583,7 @@ func (s *EVMSuite) TestSignEVMTx(c *C) { ...@@ -582,7 +583,7 @@ func (s *EVMSuite) TestSignEVMTx(c *C) {
common.NewCoin(common.AVAXAsset, cosmos.NewUint(1e18)), common.NewCoin(common.AVAXAsset, cosmos.NewUint(1e18)),
}, },
MaxGas: common.Gas{ MaxGas: common.Gas{
common.NewCoin(common.AVAXAsset, cosmos.NewUint(MaxContractGas)), common.NewCoin(common.AVAXAsset, cosmos.NewUint(e.cfg.BlockScanner.MaxGasLimit*4)),
}, },
GasRate: 1, GasRate: 1,
Memo: "MIGRATE:1024", Memo: "MIGRATE:1024",
...@@ -603,7 +604,7 @@ func (s *EVMSuite) TestSignEVMTx(c *C) { ...@@ -603,7 +604,7 @@ func (s *EVMSuite) TestSignEVMTx(c *C) {
common.NewCoin(asset, cosmos.NewUint(1e18)), common.NewCoin(asset, cosmos.NewUint(1e18)),
}, },
MaxGas: common.Gas{ MaxGas: common.Gas{
common.NewCoin(common.AVAXAsset, cosmos.NewUint(MaxContractGas)), common.NewCoin(common.AVAXAsset, cosmos.NewUint(e.cfg.BlockScanner.MaxGasLimit*4)),
}, },
GasRate: 1, GasRate: 1,
Memo: "MIGRATE:1024", Memo: "MIGRATE:1024",
......
...@@ -9,8 +9,6 @@ var routerContractABI string ...@@ -9,8 +9,6 @@ var routerContractABI string
var erc20ContractABI string var erc20ContractABI string
const ( const (
MaxContractGas = 80000
defaultDecimals = 18 // evm chains consolidate all decimals to 18 (wei) defaultDecimals = 18 // evm chains consolidate all decimals to 18 (wei)
tenGwei = 10000000000 tenGwei = 10000000000
) )
...@@ -615,7 +615,7 @@ func (e *EVMScanner) reportNetworkFee(height int64) { ...@@ -615,7 +615,7 @@ func (e *EVMScanner) reportNetworkFee(height int64) {
tcGasPrice := new(big.Int).Div(gasPrice, big.NewInt(common.One*100)) tcGasPrice := new(big.Int).Div(gasPrice, big.NewInt(common.One*100))
// post to thorchain // post to thorchain
if _, err := e.bridge.PostNetworkFee(height, e.cfg.ChainID, MaxContractGas, tcGasPrice.Uint64()); err != nil { if _, err := e.bridge.PostNetworkFee(height, e.cfg.ChainID, e.cfg.MaxGasLimit, tcGasPrice.Uint64()); err != nil {
e.logger.Err(err).Msg("failed to post EVM chain single transfer fee to THORNode") e.logger.Err(err).Msg("failed to post EVM chain single transfer fee to THORNode")
} else { } else {
e.lastReportedGasPrice = gasPrice.Uint64() e.lastReportedGasPrice = gasPrice.Uint64()
......
...@@ -164,7 +164,7 @@ func (c *EVMClient) unstuckTx(clog zerolog.Logger, item evmtypes.SignedTxItem) e ...@@ -164,7 +164,7 @@ func (c *EVMClient) unstuckTx(clog zerolog.Logger, item evmtypes.SignedTxItem) e
tx.Nonce(), tx.Nonce(),
ecommon.HexToAddress(address.String()), ecommon.HexToAddress(address.String()),
big.NewInt(0), big.NewInt(0),
MaxContractGas, c.cfg.BlockScanner.MaxGasLimit,
currentGasRate, currentGasRate,
nil, nil,
) )
......
package signer
import (
"fmt"
"time"
"github.com/rs/zerolog/log"
"gitlab.com/thorchain/thornode/bifrost/thorclient"
"gitlab.com/thorchain/thornode/common"
"gitlab.com/thorchain/thornode/x/thorchain/types"
)
////////////////////////////////////////////////////////////////////////////////////////
// Internal Types
////////////////////////////////////////////////////////////////////////////////////////
// vaultChain is a public key and chain used as a key for the vault/chain lock.
type vaultChain struct {
Vault common.PubKey
Chain common.Chain
}
type semaphore chan struct{}
// acquire will asynchronously acquire all available capacity from the semaphore.
func (s semaphore) acquire() int {
count := 0
for {
select {
case s <- struct{}{}:
count++
default:
return count
}
}
}
// release will release the provided count to the semaphore.
func (s semaphore) release(count int) {
for i := 0; i < count; i++ {
<-s
}
}
// pipelineSigner is the signer interface required for the pipeline.
type pipelineSigner interface {
isStopped() bool
storageList() []TxOutStoreItem
processTransaction(item TxOutStoreItem)
}
////////////////////////////////////////////////////////////////////////////////////////
// pipeline
////////////////////////////////////////////////////////////////////////////////////////
type pipeline struct {
// concurrency is the number of concurrent signing routines to allow.
concurrency int64
// vaultStatusConcurrency maps vault status to a semaphore for concurrent signings.
vaultStatusConcurrency map[types.VaultStatus]semaphore
// vaultChainLock maps a vault/chain combination to a lock. The lock is represented as
// a channel instead of a mutex so we can check if it is taken without blocking.
vaultChainLock map[vaultChain]chan struct{}
}
// NewPipeline creates a new pipeline instance using the provided concurrency for active
// and retiring vault status semaphores. The inactive vault status semaphore will always
// be 1 - allowing only 1 concurrent signing routine for inactive vault refunds.
func newPipeline(concurrency int64) (*pipeline, error) {
log.Info().Int64("concurrency", concurrency).Msg("creating new signer pipeline")
if concurrency < 1 {
return nil, fmt.Errorf("concurrency must be greater than 0")
}
return &pipeline{
concurrency: concurrency,
vaultStatusConcurrency: map[types.VaultStatus]semaphore{
types.VaultStatus_ActiveVault: make(semaphore, int(concurrency)),
types.VaultStatus_RetiringVault: make(semaphore, int(concurrency)),
types.VaultStatus_InactiveVault: make(semaphore, 1),
},
vaultChainLock: make(map[vaultChain]chan struct{}),
}, nil
}
// SpawnSiginings will fetch all transactions from the provided Signer's storage, and
// start signing routines for any transactions that have:
// 1. Sufficient capacity in the vault status semaphore for the source vault's status.
// 2. An available lock on the vault/chain combination (only 1 can run at a time).
//
// The signing routines will be spawned in a goroutine, and this function will not
// block on their completion. The spawned routines will release the corresponding vault
// status semaphore and vault/chain lock when they are complete.
func (p *pipeline) SpawnSignings(s pipelineSigner, bridge thorclient.ThorchainBridge) {
allItems := s.storageList()
// gather all vault/chain combinations with an out item in retry
retryItems := make(map[vaultChain][]TxOutStoreItem)
for _, item := range allItems {
if item.Round7Retry || len(item.SignedTx) > 0 {
vc := vaultChain{item.TxOutItem.VaultPubKey, item.TxOutItem.Chain}
retryItems[vc] = append(retryItems[vc], item)
}
}
var itemsToSign []TxOutStoreItem
// add retry items to our items to sign
for _, items := range retryItems {
// there should be no vault/chain with more than 1 item in retry
if len(items) > 1 {
for i := range items { // sanitize signed tx for log
items[i].SignedTx = nil
}
log.Error().
Interface("items", items).
Msg("found multiple retry items for vault/chain")
} else {
itemsToSign = append(itemsToSign, items[0])
}
}
// add all items from vault/chains with no items in retry
for _, item := range allItems {
vc := vaultChain{item.TxOutItem.VaultPubKey, item.TxOutItem.Chain}
if _, ok := retryItems[vc]; !ok {
itemsToSign = append(itemsToSign, item)
}
}
// get the available capacities for each vault status
availableCapacities := make(map[types.VaultStatus]int)
for status, semaphore := range p.vaultStatusConcurrency {
availableCapacities[status] = semaphore.acquire()
}
// release remaining capacity for each vault status on return
defer func() {
for status, capacity := range availableCapacities {
p.vaultStatusConcurrency[status].release(capacity)
}
}()
// get all locked vault/chains - otherwise races if a vault/chain unlocks mid-iteration
lockedVaultChains := make(map[vaultChain]bool)
for vc, lock := range p.vaultChainLock {
if len(lock) > 0 {
lockedVaultChains[vc] = true
}
}
// spawn signing routines for each item
for _, item := range itemsToSign {
// return if the signer is stopped
if s.isStopped() {
return
}
vc := vaultChain{item.TxOutItem.VaultPubKey, item.TxOutItem.Chain}
// check if the vault/chain is locked
if lockedVaultChains[vc] {
continue
}
// if no lock exists, create one
if _, ok := p.vaultChainLock[vc]; !ok {
p.vaultChainLock[vc] = make(chan struct{}, 1)
}
// get vault to determine vault status
vault, err := bridge.GetVault(item.TxOutItem.VaultPubKey.String())
if err != nil {
log.Err(err).
Stringer("vault_pubkey", item.TxOutItem.VaultPubKey).
Msg("failed to get tx out item vault")
return
}
// check if the vault status semaphore has capacity
if availableCapacities[vault.Status] == 0 {
continue
}
// acquire the vault status semaphore and vault/chain lock
availableCapacities[vault.Status]--
p.vaultChainLock[vc] <- struct{}{}
lockedVaultChains[vc] = true
// spawn signing routine
go func(item TxOutStoreItem, vaultStatus types.VaultStatus) {
// release the vault status semaphore and vault/chain lock when complete
defer func() {
vc2 := vaultChain{item.TxOutItem.VaultPubKey, item.TxOutItem.Chain}
<-p.vaultChainLock[vc2]
p.vaultStatusConcurrency[vaultStatus].release(1)
}()
// process the transaction
s.processTransaction(item)
}(item, vault.Status)
}
}
// Wait will block until all pipeline signing routines have completed.
func (p *pipeline) Wait() {
log.Info().Msg("waiting for signer pipeline routines to complete")
for {
running := false
for _, semaphore := range p.vaultStatusConcurrency {
if len(semaphore) > 0 {
running = true
break
}
}
if !running {
log.Info().Msg("signer pipeline routines complete")
return
}
time.Sleep(time.Second)
}
}
package signer
import (
"sync"
"github.com/rs/zerolog/log"
"gitlab.com/thorchain/thornode/bifrost/thorclient/types"
"gitlab.com/thorchain/thornode/common"
ttypes "gitlab.com/thorchain/thornode/x/thorchain/types"
. "gopkg.in/check.v1"
)
////////////////////////////////////////////////////////////////////////////////////////
// Init
////////////////////////////////////////////////////////////////////////////////////////
func init() {
// add caller to logger for debugging
log.Logger = log.With().Caller().Logger()
}
////////////////////////////////////////////////////////////////////////////////////////
// mockPipelineSigner
////////////////////////////////////////////////////////////////////////////////////////
type mockPipelineSigner struct {
sync.Mutex
stopped bool
storageListItems []TxOutStoreItem
processed []TxOutStoreItem
}
func (m *mockPipelineSigner) isStopped() bool {
return m.stopped
}
func (m *mockPipelineSigner) storageList() []TxOutStoreItem {
return m.storageListItems
}
func (m *mockPipelineSigner) processTransaction(item TxOutStoreItem) {
m.Lock()
defer m.Unlock()
// set processed
m.processed = append(m.processed, item)
// remove from storage list
for i, tx := range m.storageListItems {
if tx.TxOutItem.Equals(item.TxOutItem) {
m.storageListItems = append(m.storageListItems[:i], m.storageListItems[i+1:]...)
break
}
}
}
////////////////////////////////////////////////////////////////////////////////////////
// Test Data
////////////////////////////////////////////////////////////////////////////////////////
var (
vault1 = ttypes.GetRandomPubKey()
vault2 = ttypes.GetRandomPubKey()
tosis = []TxOutStoreItem{
{
TxOutItem: types.TxOutItem{
Chain: common.BTCChain,
ToAddress: ttypes.GetRandomBTCAddress(),
VaultPubKey: vault1,
},
},
{
TxOutItem: types.TxOutItem{
Chain: common.BTCChain,
ToAddress: ttypes.GetRandomBTCAddress(),
VaultPubKey: vault2,
},
},
{ // same vault/chain as previous, should not happen concurrent
TxOutItem: types.TxOutItem{
Chain: common.BTCChain,
ToAddress: ttypes.GetRandomBTCAddress(),
VaultPubKey: vault2,
},
},
{
TxOutItem: types.TxOutItem{
Chain: common.ETHChain,
ToAddress: ttypes.GetRandomETHAddress(),
VaultPubKey: vault1,
},
},
{
TxOutItem: types.TxOutItem{
Chain: common.ETHChain,
ToAddress: ttypes.GetRandomETHAddress(),
VaultPubKey: vault2,
},
},
}
)
////////////////////////////////////////////////////////////////////////////////////////
// PipelineSigner
////////////////////////////////////////////////////////////////////////////////////////
type PipelineSuite struct{}
var _ = Suite(&PipelineSuite{})
func (s *PipelineSuite) TestPipelineInit(c *C) {
// valid
for i := 1; i < 3; i++ {
pipeline, err := newPipeline(int64(i))
c.Assert(pipeline, NotNil)
c.Assert(err, IsNil)
}
// invalid
for i := -1; i < 1; i++ {
pipeline, err := newPipeline(int64(i))
c.Assert(pipeline, IsNil)
c.Assert(err, NotNil)
}
}
func (s *PipelineSuite) TestPipelineSequential(c *C) {
pipeline, err := newPipeline(1)
c.Assert(pipeline, NotNil)
c.Assert(err, IsNil)
// mocks
bridge := fakeBridge{nil}
mockSigner := &mockPipelineSigner{
storageListItems: append([]TxOutStoreItem{}, tosis...),
}
// spawn one signing
mockSigner.Lock() // prevent signing completion
pipeline.SpawnSignings(mockSigner, bridge)
c.Assert(len(pipeline.vaultStatusConcurrency[ttypes.VaultStatus_ActiveVault]), Equals, 1)
c.Assert(len(mockSigner.storageListItems), Equals, 5)
c.Assert(len(mockSigner.processed), Equals, 0)
// attempting another should be noop since semaphore is taken
pipeline.SpawnSignings(mockSigner, bridge)
c.Assert(len(pipeline.vaultStatusConcurrency[ttypes.VaultStatus_ActiveVault]), Equals, 1)
c.Assert(len(mockSigner.storageListItems), Equals, 5)
c.Assert(len(mockSigner.processed), Equals, 0)
// release lock and first signing should complete
mockSigner.Unlock()
pipeline.Wait()
c.Assert(len(pipeline.vaultStatusConcurrency[ttypes.VaultStatus_ActiveVault]), Equals, 0)
c.Assert(len(mockSigner.storageListItems), Equals, 4)
c.Assert(len(mockSigner.processed), Equals, 1)
c.Assert(mockSigner.processed[0].TxOutItem.Equals(tosis[0].TxOutItem), Equals, true)
// complete remaining signings
pipeline.SpawnSignings(mockSigner, bridge)
pipeline.Wait()
pipeline.SpawnSignings(mockSigner, bridge)
pipeline.Wait()
pipeline.SpawnSignings(mockSigner, bridge)
pipeline.Wait()
pipeline.SpawnSignings(mockSigner, bridge)
pipeline.Wait()
c.Assert(len(pipeline.vaultStatusConcurrency[ttypes.VaultStatus_ActiveVault]), Equals, 0)
c.Assert(len(mockSigner.storageListItems), Equals, 0)
c.Assert(len(mockSigner.processed), Equals, 5)
c.Assert(mockSigner.processed[1].TxOutItem.Equals(tosis[1].TxOutItem), Equals, true)
c.Assert(mockSigner.processed[2].TxOutItem.Equals(tosis[2].TxOutItem), Equals, true)
c.Assert(mockSigner.processed[3].TxOutItem.Equals(tosis[3].TxOutItem), Equals, true)
}
func (s *PipelineSuite) TestPipelineSequentialStop(c *C) {
pipeline, err := newPipeline(1)
c.Assert(pipeline, NotNil)
c.Assert(err, IsNil)
// mocks
bridge := fakeBridge{nil}
mockSigner := &mockPipelineSigner{
storageListItems: append([]TxOutStoreItem{}, tosis...),
}
// spawn one signing
mockSigner.Lock()
pipeline.SpawnSignings(mockSigner, bridge)
// stop the signer
mockSigner.stopped = true
// release lock and first signing should complete
mockSigner.Unlock()
pipeline.Wait()
c.Assert(len(pipeline.vaultStatusConcurrency[ttypes.VaultStatus_ActiveVault]), Equals, 0)
c.Assert(len(mockSigner.storageListItems), Equals, 4)
c.Assert(len(mockSigner.processed), Equals, 1)
// no more signings should be spawned
pipeline.SpawnSignings(mockSigner, bridge)
pipeline.Wait()
c.Assert(len(pipeline.vaultStatusConcurrency[ttypes.VaultStatus_ActiveVault]), Equals, 0)
c.Assert(len(mockSigner.storageListItems), Equals, 4)
c.Assert(len(mockSigner.processed), Equals, 1)
}
func (s *PipelineSuite) TestPipelineConcurrent(c *C) {
pipeline, err := newPipeline(10)
c.Assert(pipeline, NotNil)
c.Assert(err, IsNil)
// mocks
bridge := fakeBridge{nil}
mockSigner := &mockPipelineSigner{
storageListItems: append([]TxOutStoreItem{}, tosis...),
}
// spawn signings - only 4/5 of test data should be concurrent
mockSigner.Lock()
pipeline.SpawnSignings(mockSigner, bridge)
c.Assert(len(pipeline.vaultStatusConcurrency[ttypes.VaultStatus_ActiveVault]), Equals, 4)
c.Assert(len(mockSigner.storageListItems), Equals, 5)
c.Assert(len(mockSigner.processed), Equals, 0)
// release lock and all signings should complete
mockSigner.Unlock()
pipeline.Wait()
c.Assert(len(pipeline.vaultStatusConcurrency[ttypes.VaultStatus_ActiveVault]), Equals, 0)
c.Assert(len(mockSigner.storageListItems), Equals, 1)
c.Assert(len(mockSigner.processed), Equals, 4)
// the remaining signing should be the 3rd item
c.Assert(mockSigner.storageListItems[0].TxOutItem.Equals(tosis[2].TxOutItem), Equals, true)
// complete remaining signings
pipeline.SpawnSignings(mockSigner, bridge)
pipeline.Wait()
c.Assert(len(pipeline.vaultStatusConcurrency[ttypes.VaultStatus_ActiveVault]), Equals, 0)
c.Assert(len(mockSigner.processed), Equals, 5)
c.Assert(len(mockSigner.storageListItems), Equals, 0)
}
func (s *PipelineSuite) TestPipelineConcurrentStop(c *C) {
pipeline, err := newPipeline(10)
c.Assert(pipeline, NotNil)
c.Assert(err, IsNil)
// mocks
bridge := fakeBridge{nil}
mockSigner := &mockPipelineSigner{
storageListItems: append([]TxOutStoreItem{}, tosis...),
}
// spawn signings - only 4/5 of test data should be concurrent
mockSigner.Lock()
pipeline.SpawnSignings(mockSigner, bridge)
c.Assert(len(pipeline.vaultStatusConcurrency[ttypes.VaultStatus_ActiveVault]), Equals, 4)
c.Assert(len(mockSigner.storageListItems), Equals, 5)
c.Assert(len(mockSigner.processed), Equals, 0)
// stop the signer
mockSigner.stopped = true
// release lock and all signings should complete
mockSigner.Unlock()
pipeline.Wait()
c.Assert(len(pipeline.vaultStatusConcurrency[ttypes.VaultStatus_ActiveVault]), Equals, 0)
c.Assert(len(mockSigner.storageListItems), Equals, 1)
c.Assert(len(mockSigner.processed), Equals, 4)
// no more signings should be spawned
pipeline.SpawnSignings(mockSigner, bridge)
pipeline.Wait()
c.Assert(len(pipeline.vaultStatusConcurrency[ttypes.VaultStatus_ActiveVault]), Equals, 0)
c.Assert(len(mockSigner.storageListItems), Equals, 1)
c.Assert(len(mockSigner.processed), Equals, 4)
// the remaining signing should be the 3rd item
c.Assert(mockSigner.storageListItems[0].TxOutItem.Equals(tosis[2].TxOutItem), Equals, true)
}
func (s *PipelineSuite) TestPipelineConcurrentLimited(c *C) {
pipeline, err := newPipeline(2)
c.Assert(pipeline, NotNil)
c.Assert(err, IsNil)
// mocks
bridge := fakeBridge{nil}
mockSigner := &mockPipelineSigner{
storageListItems: append([]TxOutStoreItem{}, tosis...),
}
// spawn signings - 4/5 test data should be concurrent, but semaphore limits to 2
mockSigner.Lock()
pipeline.SpawnSignings(mockSigner, bridge)
c.Assert(len(pipeline.vaultStatusConcurrency[ttypes.VaultStatus_ActiveVault]), Equals, 2)
c.Assert(len(mockSigner.storageListItems), Equals, 5)
c.Assert(len(mockSigner.processed), Equals, 0)
// release lock and all signings should complete
mockSigner.Unlock()
pipeline.Wait()
c.Assert(len(pipeline.vaultStatusConcurrency[ttypes.VaultStatus_ActiveVault]), Equals, 0)
c.Assert(len(mockSigner.storageListItems), Equals, 3)
c.Assert(len(mockSigner.processed), Equals, 2)
// the remaining signing should be the last 3 items
c.Assert(mockSigner.storageListItems[0].TxOutItem.Equals(tosis[2].TxOutItem), Equals, true)
c.Assert(mockSigner.storageListItems[1].TxOutItem.Equals(tosis[3].TxOutItem), Equals, true)
c.Assert(mockSigner.storageListItems[2].TxOutItem.Equals(tosis[4].TxOutItem), Equals, true)
// complete 2 more signings
pipeline.SpawnSignings(mockSigner, bridge)
pipeline.Wait()
c.Assert(len(pipeline.vaultStatusConcurrency[ttypes.VaultStatus_ActiveVault]), Equals, 0)
c.Assert(len(mockSigner.processed), Equals, 4)
c.Assert(len(mockSigner.storageListItems), Equals, 1)
// the remaining signing should be the last item
c.Assert(mockSigner.storageListItems[0].TxOutItem.Equals(tosis[4].TxOutItem), Equals, true)
// finish
pipeline.SpawnSignings(mockSigner, bridge)
pipeline.Wait()
c.Assert(len(pipeline.vaultStatusConcurrency[ttypes.VaultStatus_ActiveVault]), Equals, 0)
c.Assert(len(mockSigner.processed), Equals, 5)
c.Assert(len(mockSigner.storageListItems), Equals, 0)
}
func (s *PipelineSuite) TestPipelineConcurrentLimitedStop(c *C) {
pipeline, err := newPipeline(2)
c.Assert(pipeline, NotNil)
c.Assert(err, IsNil)
// mocks
bridge := fakeBridge{nil}
mockSigner := &mockPipelineSigner{
storageListItems: append([]TxOutStoreItem{}, tosis...),
}
// spawn signings - 4/5 test data should be concurrent, but semaphore limits to 2
mockSigner.Lock()
pipeline.SpawnSignings(mockSigner, bridge)
c.Assert(len(pipeline.vaultStatusConcurrency[ttypes.VaultStatus_ActiveVault]), Equals, 2)
c.Assert(len(mockSigner.storageListItems), Equals, 5)
c.Assert(len(mockSigner.processed), Equals, 0)
// stop the signer
mockSigner.stopped = true
// release lock and all signings should complete
mockSigner.Unlock()
pipeline.Wait()
c.Assert(len(pipeline.vaultStatusConcurrency[ttypes.VaultStatus_ActiveVault]), Equals, 0)
c.Assert(len(mockSigner.storageListItems), Equals, 3)
c.Assert(len(mockSigner.processed), Equals, 2)
// the remaining signing should be the last 3 items
c.Assert(mockSigner.storageListItems[0].TxOutItem.Equals(tosis[2].TxOutItem), Equals, true)
c.Assert(mockSigner.storageListItems[1].TxOutItem.Equals(tosis[3].TxOutItem), Equals, true)
c.Assert(mockSigner.storageListItems[2].TxOutItem.Equals(tosis[4].TxOutItem), Equals, true)
// no more signings should be spawned
pipeline.SpawnSignings(mockSigner, bridge)
pipeline.Wait()
c.Assert(len(pipeline.vaultStatusConcurrency[ttypes.VaultStatus_ActiveVault]), Equals, 0)
c.Assert(len(mockSigner.storageListItems), Equals, 3)
c.Assert(len(mockSigner.processed), Equals, 2)
}
func (s *PipelineSuite) TestPipelineRound7Retry(c *C) {
s.testPipelineRetry(c, true, false)
}
func (s *PipelineSuite) TestPipelineBroadcastRetry(c *C) {
s.testPipelineRetry(c, false, true)
}
func (s *PipelineSuite) testPipelineRetry(c *C, round7, broadcast bool) {
pipeline, err := newPipeline(1)
c.Assert(pipeline, NotNil)
c.Assert(err, IsNil)
// use a copy of the test data so we can modify it
retryTosis := append([]TxOutStoreItem{}, tosis...)
// multiple items in retry for one vault/chain should be skipped
if round7 {
retryTosis[1].Round7Retry = true
retryTosis[2].Round7Retry = true
}
if broadcast {
retryTosis[1].SignedTx = []byte("broadcast")
retryTosis[2].SignedTx = []byte("broadcast")
}
// mocks
bridge := fakeBridge{nil}
mockSigner := &mockPipelineSigner{
storageListItems: append([]TxOutStoreItem{}, retryTosis...),
}
// spawn signings - the first item should process since retry items are skipped
mockSigner.Lock()
pipeline.SpawnSignings(mockSigner, bridge)
c.Assert(len(pipeline.vaultStatusConcurrency[ttypes.VaultStatus_ActiveVault]), Equals, 1)
c.Assert(len(mockSigner.storageListItems), Equals, 5)
c.Assert(len(mockSigner.processed), Equals, 0)
// release lock and signing should complete
mockSigner.Unlock()
pipeline.Wait()
c.Assert(len(pipeline.vaultStatusConcurrency[ttypes.VaultStatus_ActiveVault]), Equals, 0)
c.Assert(len(mockSigner.storageListItems), Equals, 4)
c.Assert(len(mockSigner.processed), Equals, 1)
// the first item should have been the first one processed
c.Assert(mockSigner.processed[0].TxOutItem.Equals(retryTosis[0].TxOutItem), Equals, true)
// this time only 1 item for vault/chain should be in retry so it should process first
if round7 {
retryTosis[1].Round7Retry = false
}
if broadcast {
retryTosis[1].SignedTx = nil
}
mockSigner = &mockPipelineSigner{
storageListItems: append([]TxOutStoreItem{}, retryTosis...),
}
// spawn signings - only the retry item should be started
mockSigner.Lock()
pipeline.SpawnSignings(mockSigner, bridge)
c.Assert(len(pipeline.vaultStatusConcurrency[ttypes.VaultStatus_ActiveVault]), Equals, 1)
c.Assert(len(mockSigner.storageListItems), Equals, 5)
c.Assert(len(mockSigner.processed), Equals, 0)
// release lock and signing should complete
mockSigner.Unlock()
pipeline.Wait()
c.Assert(len(pipeline.vaultStatusConcurrency[ttypes.VaultStatus_ActiveVault]), Equals, 0)
c.Assert(len(mockSigner.storageListItems), Equals, 4)
c.Assert(len(mockSigner.processed), Equals, 1)
// the retry item should have been the first one processed
c.Assert(mockSigner.processed[0].TxOutItem.Equals(retryTosis[2].TxOutItem), Equals, true)
}
...@@ -50,6 +50,7 @@ type Signer struct { ...@@ -50,6 +50,7 @@ type Signer struct {
localPubKey common.PubKey localPubKey common.PubKey
tssKeysignMetricMgr *metrics.TssKeysignMetricMgr tssKeysignMetricMgr *metrics.TssKeysignMetricMgr
observer *observer.Observer observer *observer.Observer
pipeline *pipeline
} }
// NewSigner create a new instance of signer // NewSigner create a new instance of signer
...@@ -81,7 +82,7 @@ func NewSigner(cfg config.BifrostSignerConfiguration, ...@@ -81,7 +82,7 @@ func NewSigner(cfg config.BifrostSignerConfiguration,
break break
} }
time.Sleep(constants.ThorchainBlockTime) time.Sleep(constants.ThorchainBlockTime)
fmt.Println("Waiting for node account to be registered...") log.Info().Msg("Waiting for node account to be registered...")
} }
for _, item := range na.GetSignerMembership() { for _, item := range na.GetSignerMembership() {
pubkeyMgr.AddPubKey(item, true) pubkeyMgr.AddPubKey(item, true)
...@@ -202,6 +203,57 @@ func runWithContext(ctx context.Context, fn func() ([]byte, *types.TxInItem, err ...@@ -202,6 +203,57 @@ func runWithContext(ctx context.Context, fn func() ([]byte, *types.TxInItem, err
} }
func (s *Signer) processTransactions() { func (s *Signer) processTransactions() {
signerConcurrency, err := s.thorchainBridge.GetMimir(constants.SignerConcurrency.String())
if err != nil {
s.logger.Error().Err(err).Msg("fail to get signer concurrency mimir")
return
}
// TODO: this forces new behavior in mocknet, remove after v1 logic is deprecated
if signerConcurrency <= 0 {
signerConcurrencyEnv := os.Getenv("BIFROST_SIGNER_CONCURRENCY")
if signerConcurrencyEnv != "" {
signerConcurrency, err = strconv.ParseInt(signerConcurrencyEnv, 10, 64)
if err != nil {
s.logger.Error().Err(err).Msg("fail to parse BIFROST_SIGNER_CONCURRENCY")
return
}
}
}
// if unset use v1 logic
if signerConcurrency <= 0 {
// if previously set, wait for any running signings
if s.pipeline != nil {
s.pipeline.Wait()
s.pipeline = nil
}
s.processTransactionsV1()
return
}
// if previously set to different concurrency, drain existing signings
if s.pipeline != nil && s.pipeline.concurrency != signerConcurrency {
s.pipeline.Wait()
s.pipeline = nil
}
// if not set, or set to different concurrency, create new pipeline
if s.pipeline == nil {
s.pipeline, err = newPipeline(signerConcurrency)
if err != nil {
s.logger.Error().Err(err).Msg("fail to create new pipeline")
return
}
}
// process transactions
s.pipeline.SpawnSignings(s, s.thorchainBridge)
}
func (s *Signer) processTransactionsV1() {
wg := &sync.WaitGroup{} wg := &sync.WaitGroup{}
for _, items := range s.storage.OrderedLists() { for _, items := range s.storage.OrderedLists() {
wg.Add(1) wg.Add(1)
...@@ -243,7 +295,13 @@ func (s *Signer) processTransactions() { ...@@ -243,7 +295,13 @@ func (s *Signer) processTransactions() {
continue continue
} }
s.logger.Info().Int("num", i).Int64("height", item.Height).Int("status", int(item.Status)).Interface("tx", item.TxOutItem).Msgf("Signing transaction") s.logger.Info().
Int("num", i).
Int64("height", item.Height).
Int("status", int(item.Status)).
Interface("tx", item.TxOutItem).
Msg("Signing transaction")
// a single keysign should not take longer than 5 minutes , regardless TSS or local // a single keysign should not take longer than 5 minutes , regardless TSS or local
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
checkpoint, obs, err := runWithContext(ctx, func() ([]byte, *types.TxInItem, error) { checkpoint, obs, err := runWithContext(ctx, func() ([]byte, *types.TxInItem, error) {
...@@ -256,8 +314,8 @@ func (s *Signer) processTransactions() { ...@@ -256,8 +314,8 @@ func (s *Signer) processTransactions() {
s.logger.Error().Err(err).Interface("tx", item.TxOutItem).Msg("round 7 signing error") s.logger.Error().Err(err).Interface("tx", item.TxOutItem).Msg("round 7 signing error")
item.Round7Retry = true item.Round7Retry = true
item.Checkpoint = checkpoint item.Checkpoint = checkpoint
if setErr := s.storage.Set(item); err != nil { if storeErr := s.storage.Set(item); storeErr != nil {
s.logger.Error().Err(setErr).Msg("fail to update tx out store item with round 7 retry") s.logger.Error().Err(storeErr).Msg("fail to update tx out store item with round 7 retry")
} }
} }
...@@ -346,7 +404,7 @@ func (s *Signer) processKeygen(ch <-chan ttypes.KeygenBlock) { ...@@ -346,7 +404,7 @@ func (s *Signer) processKeygen(ch <-chan ttypes.KeygenBlock) {
} }
} }
func (s *Signer) scheduleRetry(keygenBlock ttypes.KeygenBlock) bool { func (s *Signer) scheduleKeygenRetry(keygenBlock ttypes.KeygenBlock) bool {
churnRetryInterval, err := s.thorchainBridge.GetMimir(constants.ChurnRetryInterval.String()) churnRetryInterval, err := s.thorchainBridge.GetMimir(constants.ChurnRetryInterval.String())
if err != nil { if err != nil {
s.logger.Error().Err(err).Msg("fail to get churn retry mimir") s.logger.Error().Err(err).Msg("fail to get churn retry mimir")
...@@ -438,7 +496,7 @@ func (s *Signer) processKeygenBlock(keygenBlock ttypes.KeygenBlock) { ...@@ -438,7 +496,7 @@ func (s *Signer) processKeygenBlock(keygenBlock ttypes.KeygenBlock) {
// re-enqueue the keygen block to retry if we failed to generate a key // re-enqueue the keygen block to retry if we failed to generate a key
if pubKey.Secp256k1.IsEmpty() { if pubKey.Secp256k1.IsEmpty() {
if s.scheduleRetry(keygenBlock) { if s.scheduleKeygenRetry(keygenBlock) {
return return
} }
s.logger.Error().Interface("keygenBlock", keygenBlock).Msg("done with keygen retries") s.logger.Error().Interface("keygenBlock", keygenBlock).Msg("done with keygen retries")
...@@ -637,7 +695,7 @@ func (s *Signer) signAndBroadcast(item TxOutStoreItem) ([]byte, *types.TxInItem, ...@@ -637,7 +695,7 @@ func (s *Signer) signAndBroadcast(item TxOutStoreItem) ([]byte, *types.TxInItem,
// store the signed tx for the next retry // store the signed tx for the next retry
item.SignedTx = signedTx item.SignedTx = signedTx
if storeErr := s.storage.Set(item); err != nil { if storeErr := s.storage.Set(item); storeErr != nil {
s.logger.Error().Err(storeErr).Msg("fail to update tx out store item with signed tx") s.logger.Error().Err(storeErr).Msg("fail to update tx out store item with signed tx")
} }
...@@ -668,3 +726,79 @@ func (s *Signer) Stop() error { ...@@ -668,3 +726,79 @@ func (s *Signer) Stop() error {
s.blockScanner.Stop() s.blockScanner.Stop()
return s.storage.Close() return s.storage.Close()
} }
////////////////////////////////////////////////////////////////////////////////////////
// pipelineSigner Interface
////////////////////////////////////////////////////////////////////////////////////////
func (s *Signer) isStopped() bool {
select {
case <-s.stopChan:
return true
default:
return false
}
}
func (s *Signer) storageList() []TxOutStoreItem {
return s.storage.List()
}
func (s *Signer) processTransaction(item TxOutStoreItem) {
s.logger.Info().
Int64("height", item.Height).
Int("status", int(item.Status)).
Interface("tx", item.TxOutItem).
Msg("Signing transaction")
// a single keysign should not take longer than 5 minutes , regardless TSS or local
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
checkpoint, obs, err := runWithContext(ctx, func() ([]byte, *types.TxInItem, error) {
return s.signAndBroadcast(item)
})
if err != nil {
// mark the txout on round 7 failure to block other txs for the chain / pubkey
ksErr := tss.KeysignError{}
if errors.As(err, &ksErr) && ksErr.IsRound7() {
s.logger.Error().Err(err).Interface("tx", item.TxOutItem).Msg("round 7 signing error")
item.Round7Retry = true
item.Checkpoint = checkpoint
if storeErr := s.storage.Set(item); storeErr != nil {
s.logger.Error().Err(storeErr).Msg("fail to update tx out store item with round 7 retry")
}
}
if errors.Is(err, context.DeadlineExceeded) {
panic(fmt.Errorf("tx out item: %+v , keysign timeout : %w", item.TxOutItem, err))
}
s.logger.Error().Err(err).Msg("fail to sign and broadcast tx out store item")
cancel()
return
// The 'item' for loop should not be items[0],
// because problems which return 'nil, nil' should be skipped over instead of blocking others.
// When signAndBroadcast returns an error (such as from a keysign timeout),
// a 'return' and not a 'continue' should be used so that nodes can all restart the list,
// for when the keysign failure was from a loss of list synchrony.
// Otherwise, out-of-sync lists would cycle one timeout at a time, maybe never resynchronising.
}
cancel()
// if enabled and the observation is non-nil, instant observe the outbound
if s.cfg.AutoObserve && obs != nil {
s.observer.ObserveSigned(types.TxIn{
Count: "1",
Chain: item.TxOutItem.Chain,
TxArray: []types.TxInItem{*obs},
MemPool: true,
Filtered: true,
SentUnFinalised: false,
Finalised: false,
ConfirmationRequired: 0,
}, item.TxOutItem.Chain.IsEVM()) // Instant EVM observations have wrong gas and need future correct observations
}
// We have a successful broadcast! Remove the item from our store
if err = s.storage.Remove(item); err != nil {
s.logger.Error().Err(err).Msg("fail to update tx out store item")
}
}
...@@ -12,7 +12,6 @@ import ( ...@@ -12,7 +12,6 @@ import (
"strconv" "strconv"
"strings" "strings"
"sync" "sync"
"testing"
"time" "time"
"github.com/blang/semver" "github.com/blang/semver"
...@@ -71,9 +70,23 @@ func (b fakeBridge) GetMimir(key string) (int64, error) { ...@@ -71,9 +70,23 @@ func (b fakeBridge) GetMimir(key string) (int64, error) {
if strings.HasPrefix(key, "HALT") { if strings.HasPrefix(key, "HALT") {
return 0, nil return 0, nil
} }
if key == constants.SignerConcurrency.String() {
return 3, nil
}
panic("not implemented") panic("not implemented")
} }
func (b fakeBridge) GetVault(pubkey string) (types2.Vault, error) {
pk, err := common.NewPubKey(pubkey)
if err != nil {
return types2.Vault{}, err
}
return types2.Vault{
PubKey: pk,
Status: types2.VaultStatus_ActiveVault,
}, nil
}
// -------------------------------- tss --------------------------------- // -------------------------------- tss ---------------------------------
type fakeTssServer struct { type fakeTssServer struct {
...@@ -275,8 +288,6 @@ func (b *MockChainClient) GetConfirmationCount(txIn types.TxIn) int64 { ...@@ -275,8 +288,6 @@ func (b *MockChainClient) GetConfirmationCount(txIn types.TxIn) int64 {
// Tests // Tests
//////////////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////////////
func TestPackage(t *testing.T) { TestingT(t) }
var m *metrics.Metrics var m *metrics.Metrics
func GetMetricForTest(c *C) *metrics.Metrics { func GetMetricForTest(c *C) *metrics.Metrics {
...@@ -465,7 +476,7 @@ func (s *SignSuite) TestProcess(c *C) { ...@@ -465,7 +476,7 @@ func (s *SignSuite) TestProcess(c *C) {
} }
func (s *SignSuite) TestBroadcastRetry(c *C) { func (s *SignSuite) TestBroadcastRetry(c *C) {
vaultPubkey, err := common.NewPubKey(pubkeymanager.MockPubkey) vaultPubKey, err := common.NewPubKey(pubkeymanager.MockPubkey)
c.Assert(err, IsNil) c.Assert(err, IsNil)
// start a mock keysign // start a mock keysign
...@@ -499,7 +510,7 @@ func (s *SignSuite) TestBroadcastRetry(c *C) { ...@@ -499,7 +510,7 @@ func (s *SignSuite) TestBroadcastRetry(c *C) {
Chain: common.BNBChain, Chain: common.BNBChain,
ToAddress: "tbnb1yycn4mh6ffwpjf584t8lpp7c27ghu03gpvqkfj", ToAddress: "tbnb1yycn4mh6ffwpjf584t8lpp7c27ghu03gpvqkfj",
Memo: msg, Memo: msg,
VaultPubKey: vaultPubkey, VaultPubKey: vaultPubKey,
Coins: common.Coins{ // must be set or signer overrides memo Coins: common.Coins{ // must be set or signer overrides memo
common.NewCoin(common.BNBAsset, cosmos.NewUint(1000000)), common.NewCoin(common.BNBAsset, cosmos.NewUint(1000000)),
}, },
...@@ -509,6 +520,7 @@ func (s *SignSuite) TestBroadcastRetry(c *C) { ...@@ -509,6 +520,7 @@ func (s *SignSuite) TestBroadcastRetry(c *C) {
// first attempt should fail broadcast and set signed tx // first attempt should fail broadcast and set signed tx
sign.processTransactions() sign.processTransactions()
sign.pipeline.Wait()
c.Assert(cc.signCount, Equals, 1) c.Assert(cc.signCount, Equals, 1)
c.Assert(tssServer.counter, Equals, 1) c.Assert(tssServer.counter, Equals, 1)
c.Assert(cc.broadcastCount, Equals, 1) c.Assert(cc.broadcastCount, Equals, 1)
...@@ -520,6 +532,7 @@ func (s *SignSuite) TestBroadcastRetry(c *C) { ...@@ -520,6 +532,7 @@ func (s *SignSuite) TestBroadcastRetry(c *C) {
// second attempt should not sign and still fail broadcast // second attempt should not sign and still fail broadcast
sign.processTransactions() sign.processTransactions()
sign.pipeline.Wait()
c.Assert(cc.signCount, Equals, 1) c.Assert(cc.signCount, Equals, 1)
c.Assert(tssServer.counter, Equals, 1) c.Assert(tssServer.counter, Equals, 1)
c.Assert(cc.broadcastCount, Equals, 2) c.Assert(cc.broadcastCount, Equals, 2)
...@@ -531,6 +544,7 @@ func (s *SignSuite) TestBroadcastRetry(c *C) { ...@@ -531,6 +544,7 @@ func (s *SignSuite) TestBroadcastRetry(c *C) {
// third attempt should not sign and succeed broadcast // third attempt should not sign and succeed broadcast
sign.processTransactions() sign.processTransactions()
sign.pipeline.Wait()
c.Assert(cc.signCount, Equals, 1) c.Assert(cc.signCount, Equals, 1)
c.Assert(tssServer.counter, Equals, 1) c.Assert(tssServer.counter, Equals, 1)
c.Assert(cc.broadcastCount, Equals, 3) c.Assert(cc.broadcastCount, Equals, 3)
...@@ -545,7 +559,7 @@ func (s *SignSuite) TestBroadcastRetry(c *C) { ...@@ -545,7 +559,7 @@ func (s *SignSuite) TestBroadcastRetry(c *C) {
} }
func (s *SignSuite) TestRound7Retry(c *C) { func (s *SignSuite) TestRound7Retry(c *C) {
vaultPubkey, err := common.NewPubKey(pubkeymanager.MockPubkey) vaultPubKey, err := common.NewPubKey(pubkeymanager.MockPubkey)
c.Assert(err, IsNil) c.Assert(err, IsNil)
// start a mock keysign, succeeds on 5th try // start a mock keysign, succeeds on 5th try
...@@ -579,7 +593,7 @@ func (s *SignSuite) TestRound7Retry(c *C) { ...@@ -579,7 +593,7 @@ func (s *SignSuite) TestRound7Retry(c *C) {
Chain: common.BNBChain, Chain: common.BNBChain,
ToAddress: "tbnb1yycn4mh6ffwpjf584t8lpp7c27ghu03gpvqkfj", ToAddress: "tbnb1yycn4mh6ffwpjf584t8lpp7c27ghu03gpvqkfj",
Memo: msg, Memo: msg,
VaultPubKey: vaultPubkey, VaultPubKey: vaultPubKey,
Coins: common.Coins{ // must be set or signer overrides memo Coins: common.Coins{ // must be set or signer overrides memo
common.NewCoin(common.BNBAsset, cosmos.NewUint(1000000)), common.NewCoin(common.BNBAsset, cosmos.NewUint(1000000)),
}, },
...@@ -591,7 +605,7 @@ func (s *SignSuite) TestRound7Retry(c *C) { ...@@ -591,7 +605,7 @@ func (s *SignSuite) TestRound7Retry(c *C) {
Chain: common.BNBChain, Chain: common.BNBChain,
ToAddress: "tbnb145wcuncewfkuc4v6an0r9laswejygcul43c3wu", ToAddress: "tbnb145wcuncewfkuc4v6an0r9laswejygcul43c3wu",
Memo: msg, Memo: msg,
VaultPubKey: vaultPubkey, VaultPubKey: vaultPubKey,
Coins: common.Coins{ // must be set or signer overrides memo Coins: common.Coins{ // must be set or signer overrides memo
common.NewCoin(common.BNBAsset, cosmos.NewUint(1000000)), common.NewCoin(common.BNBAsset, cosmos.NewUint(1000000)),
}, },
...@@ -603,7 +617,7 @@ func (s *SignSuite) TestRound7Retry(c *C) { ...@@ -603,7 +617,7 @@ func (s *SignSuite) TestRound7Retry(c *C) {
Chain: common.BNBChain, Chain: common.BNBChain,
ToAddress: "tbnb1yxfyeda8pnlxlmx0z3cwx74w9xevspwdpzdxpj", ToAddress: "tbnb1yxfyeda8pnlxlmx0z3cwx74w9xevspwdpzdxpj",
Memo: msg, Memo: msg,
VaultPubKey: vaultPubkey, VaultPubKey: vaultPubKey,
Coins: common.Coins{ // must be set or signer overrides memo Coins: common.Coins{ // must be set or signer overrides memo
common.NewCoin(common.BNBAsset, cosmos.NewUint(1000000)), common.NewCoin(common.BNBAsset, cosmos.NewUint(1000000)),
}, },
...@@ -638,7 +652,7 @@ func (s *SignSuite) TestRound7Retry(c *C) { ...@@ -638,7 +652,7 @@ func (s *SignSuite) TestRound7Retry(c *C) {
TxOutItem: types.TxOutItem{ TxOutItem: types.TxOutItem{
Chain: common.BTCChain, Chain: common.BTCChain,
ToAddress: "tbtc1yycn4mh6ffwpjf584t8lpp7c27ghu03gpvqkfj", ToAddress: "tbtc1yycn4mh6ffwpjf584t8lpp7c27ghu03gpvqkfj",
VaultPubKey: vaultPubkey, VaultPubKey: vaultPubKey,
Memo: msg2, Memo: msg2,
Coins: common.Coins{ Coins: common.Coins{
common.NewCoin(common.BTCAsset, cosmos.NewUint(1000000)), common.NewCoin(common.BTCAsset, cosmos.NewUint(1000000)),
...@@ -650,6 +664,7 @@ func (s *SignSuite) TestRound7Retry(c *C) { ...@@ -650,6 +664,7 @@ func (s *SignSuite) TestRound7Retry(c *C) {
// first round only btc tx should go through // first round only btc tx should go through
sign.processTransactions() sign.processTransactions()
sign.pipeline.Wait()
c.Assert(cc.signCount, Equals, 1) c.Assert(cc.signCount, Equals, 1)
c.Assert(cc.broadcastCount, Equals, 0) c.Assert(cc.broadcastCount, Equals, 0)
c.Assert(tssServer.counter, Equals, 1) c.Assert(tssServer.counter, Equals, 1)
...@@ -669,6 +684,7 @@ func (s *SignSuite) TestRound7Retry(c *C) { ...@@ -669,6 +684,7 @@ func (s *SignSuite) TestRound7Retry(c *C) {
cc.assertCheckpoint = true // the following signs should pass checkpoint cc.assertCheckpoint = true // the following signs should pass checkpoint
for i := 0; i < 3; i++ { for i := 0; i < 3; i++ {
sign.processTransactions() sign.processTransactions()
sign.pipeline.Wait()
} }
// first bnb tx should have been retried 3 times, no broadcast yet // first bnb tx should have been retried 3 times, no broadcast yet
...@@ -678,6 +694,7 @@ func (s *SignSuite) TestRound7Retry(c *C) { ...@@ -678,6 +694,7 @@ func (s *SignSuite) TestRound7Retry(c *C) {
// this round should sign and broadcast the round 7 retry // this round should sign and broadcast the round 7 retry
sign.processTransactions() sign.processTransactions()
sign.pipeline.Wait()
c.Assert(cc.signCount, Equals, 5) c.Assert(cc.signCount, Equals, 5)
c.Assert(cc.broadcastCount, Equals, 1) c.Assert(cc.broadcastCount, Equals, 1)
c.Assert(tssServer.counter, Equals, 5) c.Assert(tssServer.counter, Equals, 5)
...@@ -686,9 +703,21 @@ func (s *SignSuite) TestRound7Retry(c *C) { ...@@ -686,9 +703,21 @@ func (s *SignSuite) TestRound7Retry(c *C) {
c.Assert(tois[0].Round7Retry, Equals, false) c.Assert(tois[0].Round7Retry, Equals, false)
c.Assert(tois[1].Round7Retry, Equals, false) c.Assert(tois[1].Round7Retry, Equals, false)
// this round should sign and broadcast the remaining // the next 2 rounds should sign and broadcast the remaining
cc.assertCheckpoint = false // the following signs should not pass checkpoint cc.assertCheckpoint = false // the following signs should not pass checkpoint
// only processes 1 per vault/chain in the pipeline
sign.processTransactions()
sign.pipeline.Wait()
c.Assert(cc.signCount, Equals, 6)
c.Assert(cc.broadcastCount, Equals, 2)
c.Assert(tssServer.counter, Equals, 6)
c.Assert(len(sign.storage.List()), Equals, 1)
// last one
sign.processTransactions() sign.processTransactions()
sign.pipeline.Wait()
c.Assert(cc.signCount, Equals, 7) c.Assert(cc.signCount, Equals, 7)
c.Assert(cc.broadcastCount, Equals, 3) c.Assert(cc.broadcastCount, Equals, 3)
c.Assert(tssServer.counter, Equals, 7) c.Assert(tssServer.counter, Equals, 7)
...@@ -697,6 +726,7 @@ func (s *SignSuite) TestRound7Retry(c *C) { ...@@ -697,6 +726,7 @@ func (s *SignSuite) TestRound7Retry(c *C) {
// nothing more should have happened on btc // nothing more should have happened on btc
for i := 0; i < 3; i++ { for i := 0; i < 3; i++ {
sign.processTransactions() sign.processTransactions()
sign.pipeline.Wait()
} }
c.Assert(cc.signCount, Equals, 7) c.Assert(cc.signCount, Equals, 7)
c.Assert(cc.broadcastCount, Equals, 3) c.Assert(cc.broadcastCount, Equals, 3)
......
...@@ -47,6 +47,7 @@ const ( ...@@ -47,6 +47,7 @@ const (
NodeAccountEndpoint = "/thorchain/node" NodeAccountEndpoint = "/thorchain/node"
SignerMembershipEndpoint = "/thorchain/vaults/%s/signers" SignerMembershipEndpoint = "/thorchain/vaults/%s/signers"
StatusEndpoint = "/status" StatusEndpoint = "/status"
VaultEndpoint = "/thorchain/vault/%s"
AsgardVault = "/thorchain/vaults/asgard" AsgardVault = "/thorchain/vaults/asgard"
PubKeysEndpoint = "/thorchain/vaults/pubkeys" PubKeysEndpoint = "/thorchain/vaults/pubkeys"
ThorchainConstants = "/thorchain/constants" ThorchainConstants = "/thorchain/constants"
...@@ -77,6 +78,7 @@ type ThorchainBridge interface { ...@@ -77,6 +78,7 @@ type ThorchainBridge interface {
EnsureNodeWhitelistedWithTimeout() error EnsureNodeWhitelistedWithTimeout() error
FetchNodeStatus() (stypes.NodeStatus, error) FetchNodeStatus() (stypes.NodeStatus, error)
GetAsgards() (stypes.Vaults, error) GetAsgards() (stypes.Vaults, error)
GetVault(pubkey string) (stypes.Vault, error)
GetConfig() config.BifrostClientConfiguration GetConfig() config.BifrostClientConfiguration
GetConstants() (map[string]int64, error) GetConstants() (map[string]int64, error)
GetContext() client.Context GetContext() client.Context
...@@ -534,6 +536,22 @@ func (b *thorchainBridge) GetAsgards() (stypes.Vaults, error) { ...@@ -534,6 +536,22 @@ func (b *thorchainBridge) GetAsgards() (stypes.Vaults, error) {
return vaults, nil return vaults, nil
} }
// GetVault retrieves a specific vault from thorchain.
func (b *thorchainBridge) GetVault(pubkey string) (stypes.Vault, error) {
buf, s, err := b.getWithPath(fmt.Sprintf(VaultEndpoint, pubkey))
if err != nil {
return stypes.Vault{}, fmt.Errorf("fail to get vault: %w", err)
}
if s != http.StatusOK {
return stypes.Vault{}, fmt.Errorf("unexpected status code %d", s)
}
var vault stypes.Vault
if err = json.Unmarshal(buf, &vault); err != nil {
return stypes.Vault{}, fmt.Errorf("fail to unmarshal vault from json: %w", err)
}
return vault, nil
}
func (b *thorchainBridge) getVaultPubkeys() ([]byte, error) { func (b *thorchainBridge) getVaultPubkeys() ([]byte, error) {
buf, s, err := b.getWithPath(PubKeysEndpoint) buf, s, err := b.getWithPath(PubKeysEndpoint)
if err != nil { if err != nil {
......
...@@ -177,6 +177,8 @@ services: ...@@ -177,6 +177,8 @@ services:
BIFROST_CHAINS_LTC_UTXO_CLIENT_V2: "true" BIFROST_CHAINS_LTC_UTXO_CLIENT_V2: "true"
BIFROST_CHAINS_BTC_UTXO_CLIENT_V2: "true" BIFROST_CHAINS_BTC_UTXO_CLIENT_V2: "true"
BIFROST_SIGNER_CONCURRENCY: "10"
ports: ports:
- 5040:5040 - 5040:5040
- 6040:6040 - 6040:6040
......
...@@ -11,6 +11,7 @@ var ( ...@@ -11,6 +11,7 @@ var (
avaxTokenListV95 EVMTokenList avaxTokenListV95 EVMTokenList
avaxTokenListV101 EVMTokenList avaxTokenListV101 EVMTokenList
avaxTokenListV126 EVMTokenList avaxTokenListV126 EVMTokenList
avaxTokenListV127 EVMTokenList
) )
func init() { func init() {
...@@ -23,10 +24,15 @@ func init() { ...@@ -23,10 +24,15 @@ func init() {
if err := json.Unmarshal(avaxtokens.AVAXTokenListRawV126, &avaxTokenListV126); err != nil { if err := json.Unmarshal(avaxtokens.AVAXTokenListRawV126, &avaxTokenListV126); err != nil {
panic(err) panic(err)
} }
if err := json.Unmarshal(avaxtokens.AVAXTokenListRawV127, &avaxTokenListV127); err != nil {
panic(err)
}
} }
func GetAVAXTokenList(version semver.Version) EVMTokenList { func GetAVAXTokenList(version semver.Version) EVMTokenList {
switch { switch {
case version.GTE(semver.MustParse("1.127.0")):
return avaxTokenListV127
case version.GTE(semver.MustParse("1.126.0")): case version.GTE(semver.MustParse("1.126.0")):
return avaxTokenListV126 return avaxTokenListV126
case version.GTE(semver.MustParse("1.101.0")): case version.GTE(semver.MustParse("1.101.0")):
......
...@@ -13,5 +13,8 @@ var AVAXTokenListRawV95 []byte ...@@ -13,5 +13,8 @@ var AVAXTokenListRawV95 []byte
//go:embed avax_mainnet_V101.json //go:embed avax_mainnet_V101.json
var AVAXTokenListRawV101 []byte var AVAXTokenListRawV101 []byte
//go:embed avax_mainnet_latest.json //go:embed avax_mainnet_V126.json
var AVAXTokenListRawV126 []byte var AVAXTokenListRawV126 []byte
//go:embed avax_mainnet_latest.json
var AVAXTokenListRawV127 []byte
This diff is collapsed.
...@@ -679,6 +679,24 @@ ...@@ -679,6 +679,24 @@
"name": "Bitcoin", "name": "Bitcoin",
"symbol": "BTC.b", "symbol": "BTC.b",
"logoURI": "https://raw.githubusercontent.com/traderjoe-xyz/joe-tokenlists/main/logos/0x152b9d0FdC40C096757F570A51E494bd4b943E50/logo.png" "logoURI": "https://raw.githubusercontent.com/traderjoe-xyz/joe-tokenlists/main/logos/0x152b9d0FdC40C096757F570A51E494bd4b943E50/logo.png"
},
{
"chainId": 43114,
"address": "0xFE6B19286885a4F7F55AdAD09C3Cd1f906D2478F",
"decimals": 9,
"name": "Wrapped SOL (Wormhole)",
"symbol": "SOL",
"tags": ["DeFi"],
"logoURI": "https://raw.githubusercontent.com/traderjoe-xyz/joe-tokenlists/main/logos/0xFE6B19286885a4F7F55AdAD09C3Cd1f906D2478F/logo.png"
},
{
"chainId": 43114,
"address": "0x093783055f9047c2bff99c4e414501f8a147bc69",
"decimals": 18,
"name": "Dexalot Token",
"symbol": "ALOT",
"tags": ["DeFi"],
"logoURI": "https://raw.githubusercontent.com/traderjoe-xyz/joe-tokenlists/main/logos/0x093783055f9047c2bff99c4e414501f8a147bc69/logo.png"
} }
], ],
"timestamp": "2021-06-21T00:00:00+00:00" "timestamp": "2021-06-21T00:00:00+00:00"
......