gitlab-elasticsearch-indexer should flush bulk index requests to avoid overlimit errors
Everyone can contribute. Help move this issue forward while earning points, leveling up and collecting rewards.
Background
Related: https://gitlab.com/gitlab-com/request-for-help/-/issues/2553#note_2450687212
From the discussion thread above
Related post: https://stackoverflow.com/questions/69686441/what-is-the-amazon-opensearch-equivalent-for-http-max-content-length-and-can-i-i
AWS OpenSearch has two options for max bulk size 10MiB (common) or 100MiB.
Our troubleshooting docs mention lowering to 9MB on AWS to avoid it going over the limit.
Debugging
Bulk indexing is done in two places for advanced search:
Gitlab::Elastic::BulkIndexer-
Gitlab::Elastic::Indexer(called byCommitIndexerWorker) which runs the gitlab-elasticsearch-indexer
Everything on the Rails side controls the bulk size and uses MiB. The max bulk size is passed to the gitlab-elasticsearch indexer (which also has a default just in case, also using MiB).
The problem is when a bulk request is added and it takes the request size over the limit (example, there are 8MiB data queued for indexing and the next document is 2.5MiB which just takes it over the limit).
Proposal
The Gitlab::Elastic::BulkIndexer keeps track of the size of the queued items and calls flush if the next item would take things over the limit.
The gitlab-elasticsearch-indexer should do this same tracking and check in the Index method.
proposal from Duo
type Client struct {
// ... existing fields
currentBatchSize int
mu sync.Mutex // for thread safety
}
func (c *Client) Index(documentType, id string, thing interface{}) {
c.mu.Lock()
defer c.mu.Unlock()
// Estimate document size
jsonBytes, _ := json.Marshal(thing)
docSize := len(jsonBytes) + 500 // overhead
// Check if adding this would exceed bulk size
if c.currentBatchSize + docSize > c.maxBulkSize {
logkit.Debug("Flushing bulk processor - would exceed max size")
c.Flush() // This will reset currentBatchSize
}
// Add to batch and track size
c.currentBatchSize += docSize
// ... rest of existing Index logic
}
func (c *Client) Flush() error {
err := c.bulk.Flush()
c.currentBatchSize = 0 // Reset counter after flush
if err == nil && c.bulkFailed {
err = fmt.Errorf("Failed to perform all operations")
}
return err
}