Skip to content

Use zset to store timings for bulk indexer

Terri Chu requested to merge 371526-use-zset-to-score-timestamps into master

What does this MR do and why?

Related to #371526

The bulk indexer uses a sorted set to queue items up for indexing to Elasticsearch. Currently the scoring used for the zset are unique floating point values for each item added to the sorted set when calling the track! method.

This MR adds a new zset to store the start_time for each item in the queue. The start_time will be used to calculate the time it takes for each item to be indexed into Elasticsearch. The scores used are the same as the existing zset and the members are a combination of score:timestamp to ensure uniqueness. The timestamp is fetched from redis and converted to a floating point representation of timestamp.

The successes are removed using first_score and last_score to determine when bulk indexing is completed so each item must have a unique score. Using the existing score will allow easy removal of the items from the zset once indexing is completed.

Note, that the timing calculations will for the time being support both ways of calculating timings. There is no way to populate the timing zset for existing data queued for indexing.

Screenshots or screen recordings

Screenshots are required for UI changes, and strongly recommended for all other merge requests.

How to set up and validate locally

Note: Ensure Elasticsearch is installed and Advanced Search is enabled/setup on gdk

  1. switch to master branch: git checkout master
  2. stop background jobs to prevent worker from picking up indexing: gdk stop rails-background-jobs
  3. start rails console
  4. clear any items currently being tracked: Elastic::ProcessBookkeepingService.clear_tracking!
  5. load up a bunch of issues before the change introduced by this MR: Issue.last(50).each { |x| Elastic::ProcessBookkeepingService.track!(x) }
  6. check the queued items: Elastic::ProcessBookkeepingService.queued_items
  7. switch to this branch: git checkout 371526-use-zset-to-score-timestamps
  8. start rails console
  9. load up some of the same issues as before: Issue.last(5).each { |x| Elastic::ProcessBookkeepingService.track!(x) }
  10. check the queued items: Elastic::ProcessBookkeepingService.queued_items
Click to expand
 pry(main)> Elastic::ProcessBookkeepingService.queued_items
=> {0=>[["Issue 7144 issue_7144 project_20", 1.0], ["Issue 7147 issue_7147 project_20", 2.0], ["Issue 7149 issue_7149 project_20", 3.0]],
 1=>[["Issue 7124 issue_7124 project_20", 1.0], ["Issue 7132 issue_7132 project_20", 2.0], ["Issue 7150 issue_7150 project_20", 3.0], ["Issue 7152 issue_7152 project_20", 4.0], ["Issue 7164 issue_7164 project_20", 5.0]],
 2=>[["Issue 7122 issue_7122 project_20", 1.0], ["Issue 7126 issue_7126 project_20", 2.0], ["Issue 7141 issue_7141 project_20", 3.0], ["Issue 7158 issue_7158 project_20", 4.0]],
 3=>[["Issue 7135 issue_7135 project_20", 1.0], ["Issue 7146 issue_7146 project_20", 2.0], ["Issue 7171 issue_7171 project_20", 4.0]],
 4=>[["Issue 7140 issue_7140 project_20", 1.0], ["Issue 7142 issue_7142 project_20", 2.0], ["Issue 7148 issue_7148 project_20", 3.0]],
 5=>[["Issue 7139 issue_7139 project_20", 1.0], ["Issue 7160 issue_7160 project_20", 2.0], ["Issue 7163 issue_7163 project_20", 3.0], ["Issue 7167 issue_7167 project_20", 5.0]],
 7=>[["Issue 7130 issue_7130 project_20", 1.0], ["Issue 7169 issue_7169 project_20", 3.0]],
 8=>[["Issue 7123 issue_7123 project_20", 1.0], ["Issue 7137 issue_7137 project_20", 2.0], ["Issue 7154 issue_7154 project_20", 3.0], ["Issue 7162 issue_7162 project_20", 4.0]],
 9=>[["Issue 7125 issue_7125 project_20", 1.0], ["Issue 7155 issue_7155 project_20", 2.0], ["Issue 7156 issue_7156 project_20", 3.0], ["Issue 7165 issue_7165 project_20", 4.0]],
 10=>[["Issue 7128 issue_7128 project_20", 1.0], ["Issue 7166 issue_7166 project_20", 2.0]],
 11=>
  [["Issue 7131 issue_7131 project_20", 1.0],
   ["Issue 7138 issue_7138 project_20", 2.0],
   ["Issue 7143 issue_7143 project_20", 3.0],
   ["Issue 7157 issue_7157 project_20", 4.0],
   ["Issue 7159 issue_7159 project_20", 5.0],
   ["Issue 7168 issue_7168 project_20", 7.0]],
 12=>[["Issue 7161 issue_7161 project_20", 1.0]],
 13=>[["Issue 7129 issue_7129 project_20", 1.0], ["Issue 7133 issue_7133 project_20", 2.0], ["Issue 7134 issue_7134 project_20", 3.0], ["Issue 7151 issue_7151 project_20", 4.0]],
 15=>[["Issue 7127 issue_7127 project_20", 1.0], ["Issue 7136 issue_7136 project_20", 2.0], ["Issue 7145 issue_7145 project_20", 3.0], ["Issue 7153 issue_7153 project_20", 4.0], ["Issue 7170 issue_7170 project_20", 6.0]]}
  1. check the queued timings (only 5 should have timings): Elastic::ProcessBookkeepingService.queued_timings
Click to expand
[3] pry(main)> Elastic::ProcessBookkeepingService.queued_timings
=> {3=>[["4:1664387335.483011", 4.0]], 5=>[["5:1664387335.480785", 5.0]], 7=>[["3:1664387335.48185", 3.0]], 11=>[["7:1664387335.481441", 7.0]], 15=>[["6:1664387335.482268", 6.0]]}
  1. load up new issues: Issue.first(50).each { |x| Elastic::ProcessBookkeepingService.track!(x) }
  2. check the queued items: Elastic::ProcessBookkeepingService.queued_items
Click to expand
 pry(main)> Elastic::ProcessBookkeepingService.queued_items
=> {0=>
  [["Issue 7144 issue_7144 project_20", 1.0],
   ["Issue 7147 issue_7147 project_20", 2.0],
   ["Issue 7149 issue_7149 project_20", 3.0],
   ["Issue 4 issue_4 project_1", 4.0],
   ["Issue 10 issue_10 project_2", 5.0],
   ["Issue 23 issue_23 project_3", 6.0],
   ["Issue 29 issue_29 project_4", 7.0],
   ["Issue 45 issue_45 project_6", 8.0]],
 1=>
  [["Issue 7124 issue_7124 project_20", 1.0],
   ["Issue 7132 issue_7132 project_20", 2.0],
   ["Issue 7150 issue_7150 project_20", 3.0],
   ["Issue 7152 issue_7152 project_20", 4.0],
   ["Issue 7164 issue_7164 project_20", 5.0],
   ["Issue 3 issue_3 project_1", 6.0],
   ["Issue 32 issue_32 project_4", 7.0],
   ["Issue 42 issue_42 project_6", 8.0]],
 2=>[["Issue 7122 issue_7122 project_20", 1.0], ["Issue 7126 issue_7126 project_20", 2.0], ["Issue 7141 issue_7141 project_20", 3.0], ["Issue 7158 issue_7158 project_20", 4.0], ["Issue 40 issue_40 project_6", 5.0]],
 3=>
  [["Issue 7135 issue_7135 project_20", 1.0],
   ["Issue 7146 issue_7146 project_20", 2.0],
   ["Issue 7171 issue_7171 project_20", 4.0],
   ["Issue 5 issue_5 project_1", 5.0],
   ["Issue 18 issue_18 project_3", 6.0],
   ["Issue 31 issue_31 project_4", 7.0],
   ["Issue 38 issue_38 project_5", 8.0],
   ["Issue 48 issue_48 project_6", 9.0]],
 4=>[["Issue 7140 issue_7140 project_20", 1.0], ["Issue 7142 issue_7142 project_20", 2.0], ["Issue 7148 issue_7148 project_20", 3.0], ["Issue 1 issue_1 project_1", 4.0], ["Issue 13 issue_13 project_2", 5.0], ["Issue 44 issue_44 project_6", 6.0]],
 5=>
  [["Issue 7139 issue_7139 project_20", 1.0], ["Issue 7160 issue_7160 project_20", 2.0], ["Issue 7163 issue_7163 project_20", 3.0], ["Issue 7167 issue_7167 project_20", 5.0], ["Issue 36 issue_36 project_5", 6.0], ["Issue 46 issue_46 project_6", 7.0]],
 6=>[["Issue 2 issue_2 project_1", 1.0], ["Issue 19 issue_19 project_3", 2.0], ["Issue 20 issue_20 project_3", 3.0], ["Issue 22 issue_22 project_3", 4.0], ["Issue 34 issue_34 project_5", 5.0], ["Issue 39 issue_39 project_5", 6.0]],
 7=>[["Issue 7130 issue_7130 project_20", 1.0], ["Issue 7169 issue_7169 project_20", 3.0], ["Issue 33 issue_33 project_5", 4.0], ["Issue 47 issue_47 project_6", 5.0]],
 8=>
  [["Issue 7123 issue_7123 project_20", 1.0],
   ["Issue 7137 issue_7137 project_20", 2.0],
   ["Issue 7154 issue_7154 project_20", 3.0],
   ["Issue 7162 issue_7162 project_20", 4.0],
   ["Issue 9 issue_9 project_1", 5.0],
   ["Issue 12 issue_12 project_2", 6.0],
   ["Issue 16 issue_16 project_2", 7.0],
   ["Issue 17 issue_17 project_3", 8.0],
   ["Issue 35 issue_35 project_5", 9.0]],
 9=>
  [["Issue 7125 issue_7125 project_20", 1.0],
   ["Issue 7155 issue_7155 project_20", 2.0],
   ["Issue 7156 issue_7156 project_20", 3.0],
   ["Issue 7165 issue_7165 project_20", 4.0],
   ["Issue 8 issue_8 project_1", 5.0],
   ["Issue 14 issue_14 project_2", 6.0],
   ["Issue 43 issue_43 project_6", 7.0]],
 10=>[["Issue 7128 issue_7128 project_20", 1.0], ["Issue 7166 issue_7166 project_20", 2.0], ["Issue 6 issue_6 project_1", 3.0]],
 11=>
  [["Issue 7131 issue_7131 project_20", 1.0],
   ["Issue 7138 issue_7138 project_20", 2.0],
   ["Issue 7143 issue_7143 project_20", 3.0],
   ["Issue 7157 issue_7157 project_20", 4.0],
   ["Issue 7159 issue_7159 project_20", 5.0],
   ["Issue 7168 issue_7168 project_20", 7.0],
   ["Issue 11 issue_11 project_2", 8.0],
   ["Issue 15 issue_15 project_2", 9.0],
   ["Issue 28 issue_28 project_4", 10.0],
   ["Issue 30 issue_30 project_4", 11.0]],
 12=>[["Issue 7161 issue_7161 project_20", 1.0], ["Issue 21 issue_21 project_3", 2.0], ["Issue 25 issue_25 project_4", 3.0], ["Issue 41 issue_41 project_6", 4.0], ["Issue 49 issue_49 project_7", 5.0]],
 13=>
  [["Issue 7129 issue_7129 project_20", 1.0],
   ["Issue 7133 issue_7133 project_20", 2.0],
   ["Issue 7134 issue_7134 project_20", 3.0],
   ["Issue 7151 issue_7151 project_20", 4.0],
   ["Issue 7 issue_7 project_1", 5.0],
   ["Issue 27 issue_27 project_4", 6.0],
   ["Issue 37 issue_37 project_5", 7.0],
   ["Issue 50 issue_50 project_7", 8.0]],
 14=>[["Issue 26 issue_26 project_4", 1.0]],
 15=>
  [["Issue 7127 issue_7127 project_20", 1.0],
   ["Issue 7136 issue_7136 project_20", 2.0],
   ["Issue 7145 issue_7145 project_20", 3.0],
   ["Issue 7153 issue_7153 project_20", 4.0],
   ["Issue 7170 issue_7170 project_20", 6.0],
   ["Issue 24 issue_24 project_3", 7.0]]}
  1. check the queued items: Elastic::ProcessBookkeepingService.queued_items
  2. check the queued timings: Elastic::ProcessBookkeepingService.queued_timings
Click to expand
=> {0=>[["4:1664387429.95069", 4.0], ["5:1664387429.954059", 5.0], ["6:1664387429.960395", 6.0], ["7:1664387429.963891", 7.0], ["8:1664387429.971871", 8.0]],
 1=>[["6:1664387429.94995", 6.0], ["7:1664387429.96533", 7.0], ["8:1664387429.970702", 8.0]],
 2=>[["5:1664387429.969879", 5.0]],
 3=>[["4:1664387335.483011", 4.0], ["5:1664387429.951366", 5.0], ["6:1664387429.958068", 6.0], ["7:1664387429.964849", 7.0], ["8:1664387429.969045", 8.0], ["9:1664387429.973004", 9.0]],
 4=>[["4:1664387429.948105", 4.0], ["5:1664387429.955529", 5.0], ["6:1664387429.971489", 6.0]],
 5=>[["5:1664387335.480785", 5.0], ["6:1664387429.967313", 6.0], ["7:1664387429.972254", 7.0]],
 6=>[["1:1664387429.948912", 1.0], ["2:1664387429.958545", 2.0], ["3:1664387429.958976", 3.0], ["4:1664387429.95989", 4.0], ["5:1664387429.966504", 5.0], ["6:1664387429.969473", 6.0]],
 7=>[["3:1664387335.48185", 3.0], ["4:1664387429.965964", 4.0], ["5:1664387429.972645", 5.0]],
 8=>[["5:1664387429.953586", 5.0], ["6:1664387429.955057", 6.0], ["7:1664387429.957131", 7.0], ["8:1664387429.957583", 8.0], ["9:1664387429.966922", 9.0]],
 9=>[["5:1664387429.953052", 5.0], ["6:1664387429.95598", 6.0], ["7:1664387429.971111", 7.0]],
 10=>[["3:1664387429.951948", 3.0]],
 11=>[["7:1664387335.481441", 7.0], ["8:1664387429.95456", 8.0], ["9:1664387429.956643", 9.0], ["10:1664387429.963419", 10.0], ["11:1664387429.964363", 11.0]],
 12=>[["2:1664387429.959422", 2.0], ["3:1664387429.9617", 3.0], ["4:1664387429.970301", 4.0], ["5:1664387429.973375", 5.0]],
 13=>[["5:1664387429.952432", 5.0], ["6:1664387429.962924", 6.0], ["7:1664387429.968038", 7.0], ["8:1664387429.973764", 8.0]],
 14=>[["1:1664387429.962277", 1.0]],
 15=>[["6:1664387335.482268", 6.0], ["7:1664387429.961022", 7.0]]}
  1. initiate indexing: ElasticIndexBulkCronWorker.new.perform
  2. verify in the elasticsearch.log file that the entries for message: "indexing_done" contain search_indexing_duration_s. Note that the timings that do not use the timings zset may be large due to using the record.updated_at to calculate the timings.
example
{"severity":"INFO","time":"2022-09-28T17:54:27.347Z","correlation_id":null,"message":"indexing_done","model_class":"Issue","model_id":"2","es_id":"issue_2","es_parent":"project_1","search_indexing_duration_s":236.85733580589294,"search_indexing_flushing_duration_s":0.20339100004639477}

MR acceptance checklist

This checklist encourages us to confirm any changes have been analyzed to reduce risks in quality, performance, reliability, security, and maintainability.

Edited by Terri Chu

Merge request reports