Skip to content
Snippets Groups Projects
Verified Commit 052f3020 authored by Ethan Reesor's avatar Ethan Reesor
Browse files

Better dispatch errors

parent 45c24ff5
No related branches found
No related tags found
Loading
Pipeline #1096705013 failed
......@@ -90,9 +90,9 @@ var errTxInCache2 = jsonrpc2.NewError(jsonrpc2.ErrorCode(errTxInCache1.Code), er
var errTxInCacheAcc = jsonrpc2.NewError(v2.ErrCodeAccumulate, "Accumulate Error", errTxInCache1.Data)
// CheckDispatchError ignores errors we don't care about.
func CheckDispatchError(err error, errs chan<- error) {
func CheckDispatchError(err error) error {
if err == nil {
return
return nil
}
// TODO This may be unnecessary once this issue is fixed:
......@@ -100,30 +100,30 @@ func CheckDispatchError(err error, errs chan<- error) {
// Is the error "tx already exists in cache"?
if err.Error() == mempool.ErrTxInCache.Error() {
return
return nil
}
// Or RPC error "tx already exists in cache"?
var rpcErr1 *jrpc.RPCError
if errors.As(err, &rpcErr1) && *rpcErr1 == *errTxInCache1 {
return
return nil
}
var rpcErr2 jsonrpc2.Error
if errors.As(err, &rpcErr2) && (rpcErr2 == errTxInCache2 || rpcErr2 == errTxInCacheAcc) {
return
return nil
}
var errorsErr *errors.Error
if errors.As(err, &errorsErr) {
// This probably should not be necessary
if errorsErr.Code == errors.Delivered {
return
return nil
}
}
// It's a real error
errs <- err
return err
}
// Send sends all of the batches asynchronously using one connection per
......@@ -154,6 +154,19 @@ func (d *dispatcher) send(ctx context.Context, queue map[string][]*messaging.Env
}
clients := d.getClients(ctx, want)
check := func(err error, peer *coretypes.Peer) {
err = CheckDispatchError(err)
if err != nil {
return
}
if peer == nil {
errs <- errors.UnknownError.WithFormat("local: %w", err)
} else {
errs <- errors.UnknownError.WithFormat("peer %v (%v): %w", peer.NodeInfo.ID(), peer.RemoteIP, err)
}
}
for part, queue := range queue {
mDispatchEnvelopes.Add(float64(len(queue)))
......@@ -174,27 +187,32 @@ func (d *dispatcher) send(ctx context.Context, queue map[string][]*messaging.Env
subs, err := submitter.Submit(ctx, env, api.SubmitOptions{})
if err != nil {
mDispatchErrors.Inc()
CheckDispatchError(err, errs)
check(err, client.peer)
continue
}
// Check for failed submissions
for _, sub := range subs {
if sub.Status != nil {
CheckDispatchError(sub.Status.AsError(), errs)
check(sub.Status.AsError(), client.peer)
}
}
}
}
}
type peerClient struct {
DispatcherClient
peer *coretypes.Peer
}
// getClients returns a map of Tendermint RPC clients for the given partitions.
func (d *dispatcher) getClients(ctx context.Context, want map[string]bool) map[string]DispatcherClient {
clients := make(map[string]DispatcherClient, len(want))
func (d *dispatcher) getClients(ctx context.Context, want map[string]bool) map[string]peerClient {
clients := make(map[string]peerClient, len(want))
// Prefer local clients
for part, client := range d.self {
clients[part] = client
clients[part] = peerClient{DispatcherClient: client}
delete(want, part)
}
......@@ -242,7 +260,7 @@ func (d *dispatcher) getClients(ctx context.Context, want map[string]bool) map[s
part = strings.ToLower(part)
if want[part] {
delete(want, part)
clients[part] = bvn
clients[part] = peerClient{bvn, &peer}
}
// If we have everything we want, we're done
......
......@@ -75,8 +75,16 @@ func (d *dispatcher) Send(ctx context.Context) <-chan error {
messages := d.messages
d.messages = nil
// Run asynchronously
errs := make(chan error)
check := func(err error) {
err = tendermint.CheckDispatchError(err)
if err != nil {
return
}
errs <- err
}
// Run asynchronously
go func() {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
......@@ -94,14 +102,14 @@ func (d *dispatcher) Send(ctx context.Context) <-chan error {
switch res := res.(type) {
case *message.ErrorResponse:
// Handle error
tendermint.CheckDispatchError(res.Error, errs)
check(res.Error)
return nil
case *message.SubmitResponse:
// Check for failed submissions
for _, sub := range res.Value {
if sub.Status != nil {
tendermint.CheckDispatchError(sub.Status.AsError(), errs)
check(sub.Status.AsError())
}
}
return nil
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment