How to Bulk Index ElasticSearch Data
You may be using the ElasticSearch in your tech stack, but can you also make it more performant by utilizing Bulk update? If so, how? Read on to find out more
Posted by Simar Mann Singh on 20 Sep, 2023
Introduction
Every digital application now-a-days need a fast and responsive data storage solution. While we have many modern databases that can store terabytes and petabytes of data easily, almost none of them can also provide a near real-time searching option on top of that stored data. And if there are some solutions, adding this search facility to a distributed system makes the process even harder.
So, it is very important when it come distributed data, that a plausible solution be selected for adding the searching functionality to the data.
Short detour down the History's lane
Back in 2010, Shay Banon started an open-source project within the company called Compass where he worked at. Banon was also looking for a solution, back in those days he couldn't find a good option. Apache Lucene was there, but it was (and still is) a java library. That meant, the application has to be in java, to take advantage of the Apache Lucene. It did not offer any REST API. Deploying it to a distributed system was a challenge in itself again. Banon saw there is a gap that needs to be filled. He put on his thinking hat and started working on the project which later started known as ElasticSearch. With ElasticSearch, Banon aimed to build a scalable, distributed search engine that could handle large volumes of data and provide real-time search capabilities. He envisioned a distributed search engine built on top of Apache Lucene that would be easy to use, highly scalable, and capable of handling various use cases including full-text search, log analytics, and real-time data monitoring.
Building ElasticSearch, a distributed search engine that can scale across hundreds or thousands of nodes while maintaining performance and reliability must have been a challenge.
But lets not get too deep into history alone. Lets move on.
ElasticSearch Functioning
Elasticsearch is a distributed, RESTful search and analytics engine built on top of Apache Lucene.
The ElasticSearch stores data in the form of JSON documents. These documents are organized into indices, which are logical collections of documents that share similar characteristics or belong to the same data category.
It uses something called as inverted indexes, which are essentially optimized data structures for the purpose of full-text search.
These indices technically store a mapping of terms to the actual documents that contain them, enabling fast lookup and retrieval of relevant documents based on search queries.
For the Query, Elasticsearch provides a powerful query language that allows users to perform full-text search, structured queries, and aggregations on their data. Not only that, what made ElasticSearch so popular is the fact that it also exposes a RESTful API that allows users to interact with the system using standard HTTP
methods. This API can be used to index
, search
, update
, and delete
documents, as well as manage cluster settings and monitor cluster health.
Lets go ahead and see how we can reap the benefits of ElasticSearch.
For the entirity of the rest of this blog post, we will be using code snippets / examples in Python.
Indexing Data in ElasticSearch
Normally, when we have small amount of data, we can simply add it to the index using without resorting to any such complexities of the bulk API or anything like that. For that, something like this could work fine.
# main.py
from elasticsearch import Elasticsearch
# Initialize Elasticsearch client
es = Elasticsearch(['http://localhost:9200'])
# Example data to be indexed
data = {
"title": "Example Document",
"content": "This is some example content for the document.",
"tags": ["example", "test"]
}
# Index the data into Elasticsearch
index_name = "example_index"
doc_id = "1" # Document ID
response = es.index(index=index_name, id=doc_id, body=data)
# Print the response
print(response)
However, things get interesting when the input data grows. For various applications like data-science or data analysis, we need to index huge amount of data very quickly. For such cases, the above example would be very terrible idea. It would take hours if we keep on adding data to indices.
Lets take a look at the bulk APIs now.
Bulk Update
The elasticsearch python sdk contains many helper functions. Some helper functions makes our life easy as they abstract away the complexities and specific format requirements. All bulk helpers accept an instance of Elasticsearch class and an iterable actions (any iterable, can also be a generator, which is ideal in most cases since it will allow you to index large datasets without the need of loading them into memory)
For the case of bulk update, we will examine the following methods provided by the elasticsearch python sdk.
- bulk()
- async_bulk()
- parallel_bulk()
- streaming_bulk()
- async_streaming_bulk()
bulk()
This helper function can be called once with the data that contains multiple records, and all these multiple records can be added to the index in one simple REST call. Hence, the name bulk.
The bulk()
method is basically a synchronous function that sends multiple indexing, updating, or deleting requests to Elasticsearch in a single HTTP request. Mind it, it can be for updating, or deleting.
In simple words, Each of the record contains an action
based on which, the ElasticSearch engine decides whether it needs to be updated or deleted. Now multiple of such records are stacked together and the enclosing json is sent as a single HTTP request.
Now what is also important to note is that it is synchronous. It waits for the response from Elasticsearch before proceeding to the next batch of actions. So, naturally this method is suitable for relatively smaller datasets or when synchronous processing is preferred. Keep in mind, for even higher performance, keep on reading for the next methods.
# main.py
from elasticsearch import Elasticsearch
from elasticsearch.helpers import bulk
# Initialize Elasticsearch client
es = Elasticsearch(['http://localhost:9200'])
def gendata():
mywords = ['foo', 'bar', 'baz']
for word in mywords:
yield {
"_index": "mywords",
"word": word,
}
# Bulk indexing using bulk() method
success, failed = bulk(es, gendata())
print(f"Bulk indexing using bulk() method - Success: {success}, Failed: {failed}")
Notice the yield
. It is a generator function that spits out a value, slightly wait to pass it on, and then continues again.
async_bulk()
This is just the synchronous version of the bulk()
method. It returns asyncio tasks that can be awaited for asynchronous execution. This method is suitable for asynchronous programming environments such as asyncio.
# main.py
import asyncio
from elasticsearch import AsyncElasticsearch
from elasticsearch.helpers import async_bulk
es = AsyncElasticsearch()
async def gendata():
mywords = ['foo', 'bar', 'baz']
for word in mywords:
yield {
"_index": "mywords",
"doc": {"word": word},
}
async def main():
await async_bulk(es, gendata())
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
We pass a generator function to the async_bulk() helper function, and it keeps the cycle running.
parallel_bulk()
The parallel\_bulk()
method is designed for bulk indexing large datasets efficiently in parallel.
Its basically a wrapper around the bulk()
API. It processes actions in parallel using multiple threads, which can significantly improve indexing speed.
Using multiple threads on a processor, it concurrently sends bulk HTTP requests, from parallel threads. It is suitable for bulk indexing large volumes of data when speed is a priority.
from elasticsearch import Elasticsearch
from elasticsearch.helpers import parallel_bulk
# Initialize Elasticsearch client
es = Elasticsearch(['http://localhost:9200'])
def gendata():
mywords = ['foo', 'bar', 'baz']
for word in mywords:
yield {
"_index": "mywords",
"word": word,
}
# Bulk indexing using parallel_bulk() method
success, failed = parallel_bulk(es, gendata())
print(f"Bulk indexing using parallel_bulk() method - Success: {success}, Failed: {failed}")
streaming_bulk()
The streaming_bulk()
method is similar to parallel_bulk()
but is optimized for memory efficiency. It streams data in and out of Elasticsearch, meaning it doesn't load the entire dataset into memory at once.
This method is suitable for very large datasets that might not fit into memory.
# main.py
from elasticsearch import Elasticsearch
from elasticsearch.helpers import streaming_bulk
es = Elasticsearch()
def gendata():
mywords = ['foo', 'bar', 'baz']
for word in mywords:
yield {
"_index": "mywords",
"word": word,
}
def main():
for ok, result in streaming_bulk(es, gendata()):
action, result = result.popitem()
if not ok:
print("failed to %s document %s" % (action, result))
main()
async_streaming_bulk()
This is the async version of the streaming_bulk()
method. So, it doesn't wait for the response while sending the bulk requests in stream.
import asyncio
from elasticsearch import AsyncElasticsearch
from elasticsearch.helpers import async_streaming_bulk
es = AsyncElasticsearch()
async def gendata():
mywords = ['foo', 'bar', 'baz']
for word in mywords:
yield {
"_index": "mywords",
"word": word,
}
async def main():
async for ok, result in async_streaming_bulk(es, gendata()):
action, result = result.popitem()
if not ok:
print("failed to %s document %s" % ())
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
So, as we saw, we can use multiple variants of the bulk API depending on the use case.
Further Reading
Readers are encouraged to also take a look at the original documentation for the Bulk Helper functions for any clarification.
- ElasticSearch Bulk RESTful API
- Bulk Helper functions v7.x
- Bulk Helper functions v8.13 - current at the time of writing this blog
- Async variants of Bulk helper functions
Feel free to write your opinions, questions or any errors you may have come across while reading this blog in the comments section below.
You can use the contact
form as well.