gitlab-elasticsearch-indexer should flush bulk index requests to avoid overlimit errors

Everyone can contribute. Help move this issue forward while earning points, leveling up and collecting rewards.

Background

Related: https://gitlab.com/gitlab-com/request-for-help/-/issues/2553#note_2450687212

From the discussion thread above

Related post: https://stackoverflow.com/questions/69686441/what-is-the-amazon-opensearch-equivalent-for-http-max-content-length-and-can-i-i

AWS OpenSearch has two options for max bulk size 10MiB (common) or 100MiB.

Our troubleshooting docs mention lowering to 9MB on AWS to avoid it going over the limit.

Debugging

Bulk indexing is done in two places for advanced search:

Everything on the Rails side controls the bulk size and uses MiB. The max bulk size is passed to the gitlab-elasticsearch indexer (which also has a default just in case, also using MiB).

The problem is when a bulk request is added and it takes the request size over the limit (example, there are 8MiB data queued for indexing and the next document is 2.5MiB which just takes it over the limit).

Proposal

The Gitlab::Elastic::BulkIndexer keeps track of the size of the queued items and calls flush if the next item would take things over the limit.

The gitlab-elasticsearch-indexer should do this same tracking and check in the Index method.

proposal from Duo
type Client struct {
    // ... existing fields
    currentBatchSize int
    mu              sync.Mutex // for thread safety
}

func (c *Client) Index(documentType, id string, thing interface{}) {
    c.mu.Lock()
    defer c.mu.Unlock()
    
    // Estimate document size
    jsonBytes, _ := json.Marshal(thing)
    docSize := len(jsonBytes) + 500 // overhead
    
    // Check if adding this would exceed bulk size
    if c.currentBatchSize + docSize > c.maxBulkSize {
        logkit.Debug("Flushing bulk processor - would exceed max size")
        c.Flush() // This will reset currentBatchSize
    }
    
    // Add to batch and track size
    c.currentBatchSize += docSize
    
    // ... rest of existing Index logic
}

func (c *Client) Flush() error {
    err := c.bulk.Flush()
    c.currentBatchSize = 0 // Reset counter after flush
    
    if err == nil && c.bulkFailed {
        err = fmt.Errorf("Failed to perform all operations")
    }
    
    return err
}
Edited by Terri Chu