Our Elasticsearch (ES) cluster was experiencing unpredictable query slowdowns due to irregular upsert volumes. The slowdown occurred mostly during increased write times. We seen it on Graph. We knew what workers were running at those times. Grafana was used for monitoring.

Project Overview

The project was an aggregator of merchant products, synchronized from XML feeds. The key details:

  • Products table: 10M+ records, with 1-2M active at a time.
  • Database: MariaDB was the source of truth.
  • Elasticsearch Role: Used for sorting and querying product data, while details were fetched from the database based on product IDs.
  • Product States: active | inactive
  • Merchant States: paid | free | disabled (plus a fourth state, which I don’t recall).
  • Elasticsearch Indexing: Only active products from paid or free merchants were indexed.
  • Frequent Changes: Products were continuously inserted, updated, or deleted in ES based on product and merchant state, workers processing specified product attributes and changes made by marketing moderators.

Workflow with Multiple Workers

We had multiple workers handling different attributes of records, some workers had ability to create all attributes of a record.

  • SynchronizerWorker
    • { ‘1’ => { ‘product_id’ => 1, title: ‘’, vectors: [], price: 300.2, category_id: 10 } }
  • VectorsWorker
    • { ‘1’ => { ‘product_id’ => 1, vectors: { a: [], b: [], c: []} } }
  • TitleWorker
    • { ‘1’ => { ‘product_id’ => 1, title: ‘’ } }
  • DescriptionWorker
    • { ‘1’ => { ‘product_id’ => 1, description: ‘lorem ipsum’ } }
  • DeletionWorker
  • AdminWorker
    • { ‘1’ => { ‘product_id’ => 1, category_id: 2 } }

Most ot these could trigger ProductUpsertWorker, ProductDeleteWorker, leading to ES updates. Since these updates were not synchronized, they caused congestion, affecting query performance. Several processes handled ES queue jobs, but unpredictable workloads created bottlenecks. ES works with updates like this, that it creates a new version of updated record, and than forward new version to other nodes in cluster. In Grafana we were seeing spikes in memory usage. During that times ES used more ram memory to handle inserts/upserts than for query performance - there is more to write about how ES handle creation/updates of records.

Potential Solutions Considered

  1. Hardcode worker execution times
    1. pros: for a time beeing it might help
    2. cons:
      1. hard to maintain over long period of time
      2. every time we will add new worker or process it might have cascading effect
  2. Upgrade Elasticsearch hardware - faster NVMe hard drives (not an option due to non-budget constraints).
  3. Implement a single pipeline for ES updates (chosen approach).
    1. knows what happens
    2. easy to debug - one point of failure
    3. one master over cluster
    4. possibility to increase workload
    5. scalable
    6. testable solution
  4. Increment cache TTL on catalogue aggregation query (also implemented)
    1. first touch was to increase cache time, considered outdated data in ES

Constraints

  • No additional hardware investment.
  • Limited developer time.
  • Minimal reliance on an external DevOps team (we were not their priority in case of new features)
  • Maintainability

Chosen Solution: OnePipeline

Goals

  1. Centralize upserts/deletions to ES.
  2. Batch and merge product updates before writing to ES.
  3. Ensure fail-safety and retry mechanisms.
  4. Reduce the number of upserts per product.
  5. Implement a throttling mechanism.
  6. Allow disabling ES writes completely (done by turning cron job on and off)
  7. Enable a gradual transition to OnePipeline from the old system.

Centralize upserts/deletions in one place.

Workers were scattered over few applications. We need a central job to gather all data. And also to stream data. Because it was used, we knew it well enough, metrics were in place, integrations also, obvious choice was Kafka. We had an internal gem for message production, making it simple to send messages:

Messages::Produce.perform_async(“one-pipeline”, message_hash)

However, handling message data was tricky due to JSON parsing variations (e.g., symbolized vs. string keys, newline handling when pushing message from line file).

Implementation

  1. Kafka Consumer with Karafka
    • Used Karafka to fetch multiple messages from a Kafka topic into an array.
    • Each message was a hash with one oraz many product_id as the keys.
      • { ‘1’ => { ‘title’ => ‘Awesome Red Product’ } }
    • Later on, in next iterations, messages were grouped under upsert or delete, also supporting ES 5 and ES 7 formats - hash structure for both versions were slightly different)
    • Deep merging enabled us not to do multiple updates for the same product.
  2. Handling Kafka Limitations
    • Karafka was limited to fetching 1000 messages per batch, which was insufficient.
    • Alternative Kafka clients required additional dependencies and infrastructure changes (PRs to Ansible, Capistrano updates, etc.).
    • Sticking with Karafka avoided excessive complexity.
  3. Redis for Message Storage
    • Why Redis?
      • faster than db
      • with db we already had problems with too many connections
    • Created a new Kafka topic and extended the Karafka consumer.
    • Karafka consumer, stored them in Redis lists - one_pipeline_list
    • Used Redis/Ruby Interface - not all commands were available in Ruby due to wrapping native C++ functionality
    • No expiration was set on the Redis list to ensure durability.
    • Consulted the DevOps team to confirm available Redis and Kafka storage capacity.
  4. Backup and Retry Mechanism
    • Introduced a backup_list alongside the main Redis list to ensure robustness.
    • When processing messages:
      • Moved them from main_listbackup_list.
      • Performed merging and queued Sidekiq jobs for ES updates.
      • Only after successful execution were messages removed from backup_list.
    • This ensured:
      • No data loss on process failure (e.g., Sidekiq termination with kill -9 from bash).
      • New data wouldn’t mix with partially processed data.

Results

  • Reduced ES upsert volume, leading to more predictable query performance.
    • logged it using Rollbar with additional data as upsert_records divided by input_records_count
  • Faster query execution times
  • Easier debugging & failure recovery, thanks to Redis-backed retry mechanisms.
  • Smooth transition from the old system, as we could toggle between methods.

There was no fancy checking of everything with hard-data. After implementation we added more vectors data to product records in ES and had no problems with ES after this. We didn’t need to think about - will ES failing in prime time? One Pipeline took responsibility of managing workload of ES.

Multiple Deployments

Most of the changes in this technical challenge called OnePipeline were not depended on each other. As far as I remember I divided this into following PRs:

  • messages producer change to add new topic to the list
  • new Karafka consumer with putting message on Redis List
  • OnePipeline worker - to deep merge data
  • Change each invoke of Upserting ES data to produce it to one-pipeline topic.

MVP of this challenge was to upsert data in one place. Later on we refactor it to handle deletions and additional version of ES7. Also there were talks how to approach this problem. Especially with Karafka and how to get many messages once.

Yes, each change was separate deployment. Deployment took about 3-7 minutes :)