Mastering Elasticsearch with Python: A Comprehensive Guide
Elasticsearch is a distributed search and analytics engine designed to handle large-scale data. It can scale horizontally by adding more nodes to the cluster, making it capable of handling high-volume data and providing fast search responses. It stores and indexes data in the form of JSON documents.
Each document contains one or more fields with their corresponding values. Data is indexed in ES by specifying an index, type and ID for each document. It offers excellent search capabilities, including full-text search, filtering, aggregation etc. Elasticsearch provides a Query DSL (Domain-Specific Language) that allows you to construct complex queries using JSON-like syntax. It indexes and analyzes data to enable fast and accurate search results across various types of documents.
For more detailed information about Elasticsearch and its features, you can visit the official website: Elasticsearch Official Website
Elasticsearch Python Library
The Python Elasticsearch library is the official Python client for Elasticsearch. It provides a high-level and low-level interface to interact with Elasticsearch.
You can install elasticsearch using pip3 install elasticsearch
from your terminal or !pip3 install elasticsearch
in notebook.
from elasticsearch import Elasticsearch
config = {
"ES_USER": "elastic",
"ES_PASS": "password",
}
es = Elasticsearch(
cloud_id="id",
basic_auth=(config['ES_USER'], config['ES_PASS']),
request_timeout=60)
Elasticsearch Client class represents the Elasticsearch client and is used to establish a connection with an Elasticsearch cluster. Of course first you need to have your own cluster, which you can create during your free Elasticsearch trial. You can create an instance of the client by specifying the Elasticsearch host and port.
To check the connection with ES host, you can use es.ping() :
if es.ping():
print(“Connected to Elasticsearch.”)
else:
print("Failed to connect to Elasticsearch.")
Instead of using host
and id
, I am using cloud_id
and basic_auth
with username
and password
.
To establish a connection to Elastic Cloud using the Python Elasticsearch client, it is recommended to utilize the cloud_id
parameter. You can locate this value on the “Manage Deployment” page, which becomes accessible after creating a cluster. In Kibana, you can find it in the top-left corner of the page.
Index Management
Elasticsearch library provides functions for managing indices such as creating, deleting, checking the existence of an index, and more.
# Create an index
es.indices.create(index='test')
# Check if an index exists
if es.indices.exists(index='test'):
print("Index exists")
else:
print("Index doesn't exist")
# Delete an index
es.indices.delete(index='test')
if es.indices.exists(index='test'):
print("Index exists")
else:
print("Index doesn't exist")
Document Indexing
You can index documents in Elasticsearch using the index
function. Indexing a document in Elasticsearch entails adding data to the database in a structured manner, much like organizing information into folders, to facilitate quick searches and retrieval later on. This enables Elasticsearch to efficiently organize and retrieve specific information, streamlining the handling of large amounts of data. The document is represented as a Python dictionary and is associated with an index, type, and an optional ID.
# Create an example document
example_doc = {
"title": 'Example Document',
"content": "Content of an example document."
}
# Create an index
es.indices.create(index='example')
# Check if an index exists
if es.indices.exists(index='example'):
print("Index exists")
# Index a document
es.index(index='example', id=1, document=example_doc)
else:
print("Index doesn't exist.")
Output:
ObjectApiResponse({'_index': 'example', '_id': '1', '_version': 1,
'result': 'created', '_shards': {'total': 2, 'successful': 2, 'faile
d': 0}, '_seq_no': 0, '_primary_term': 1})
_index
: Indicates the name of the index where the document was indexed, in this case, ‘example’._id
: Represents the unique identifier assigned to the indexed document, which is ‘1’ in this case._version
: Denotes the version of the document after indexing, which is ‘1’.result
: Indicates the result of the indexing operation. In this case, it is ‘created’, indicating that the document was successfully created and indexed._shards
: Provides information about the number of shards involved in the indexing process. Shards are smaller units of the index distributed across nodes in a cluster. In this case, ‘total
’ is 2, indicating that the operation involved two shards, and ‘successful
’ is 2, meaning the indexing was successful on all shards._seq_no
and_primary_term
: These terms relate to Elasticsearch’s internal versioning system, which helps maintain consistency and handle conflicts in distributed environments. They represent specific details about the internal versioning of the document.
Document Retrieval
You can retrieve documents from ES using various methods like get
, search
etc. These methods allow you to query and filter the documents based on specific criteria.
# Get a document by ID
result = es.get(index='example', id='1')
print(result)
Output:
{'_index': 'example', '_id': '1', '_version': 1, '_seq_no': 0, '_pri
mary_term': 1, 'found': True, '_source': {'title': 'Example Documen
t', 'content': 'Content of an example document.'}}
Print the source data of the document:
doc = result['_source']
print(doc)
Output:
{'title': 'Example Document', 'content': 'Content of an example docu
ment.'}
doc = result[‘_source’]
is used to access the actual source data of the document from the response, allowing you to work directly with the document’s fields and values without additional parsing or extraction steps.
# Search documents
query = {
'match': {
'title': 'Example Document'
}
}
results = es.search(index='example', query=query)
print(results)
Output:
{'took': 0, 'timed_out': False, '_shards': {'total': 1, 'successfu
l': 1, 'skipped': 0, 'failed': 0}, 'hits': {'total': {'value': 0, 'r
elation': 'eq'}, 'max_score': None, 'hits': []}}
Bulk Operations
The library provides the bulk function to perform bulk operations like indexing, updating, deleting multiple documents in a single API call, which can significantly improve indexing performance. The bulk API accepts a list of action items, where each item represents a specific operation to be performed on a document.
Each action item consists of a combination of an operation (index, update, delete) and the corresponding document data. You can include multiple action items in a single bulk request to perform various operations simultaneously.
I will use data list of dictionaries as my documents to create bulk actions. We will need another index for this purpose. The
# data below is just a piece of the data I used, but it's enough for hands-on exercise
data = [
{'id': 1,
'title': 'NumPy',
'description': 'NumPy is a powerful python library with many functions for creating and manipulating multi-dimensional arrays and matrices.'
},
{'id': 2,
'title': 'Pandas',
'description': 'Pandas is a Python library for data manipulation and analysis. It provides data structures for efficient storage of data and high-level manipulations.'
},
{'id': 3,
'title': 'Scikit-Learn',
'description': 'Scikit-Learn is a popular library for machine learning in Python. It provides tools to build, train, evaluate, and deploy machine learning algorithms.'
},
{'id': 4,
'title': 'Matplotlib',
'description': 'Matplotlib is a Python plotting library for creating publication quality plots. It can produce line graphs, histograms, power spectra, bar charts, and more.'
},
{'id': 5,
'title': 'Seaborn',
'description': 'Seaborn is a graphical library in Python for drawing statistical graphics. It provides a high level interface for drawing attractive statistical graphics.'
}
]
from elasticsearch.helpers import bulk, BulkIndexError
# Create an index
py_indexname = 'py-libraries'
es.indices.create(index=py_indexname)
# Check if an index exists
if es.indices.exists(index=py_indexname):
print("Index exists")
else:
print("Index doesn't exist.")
actions = [
{"_index": py_indexname,
"_id": doc["id"],
"_source": {
"library": doc["title"],
"description": doc["description"]}
}
for doc in data
]
try:
bulk(es, actions)
print("Data successfully indexed in the destination index.")
except BulkIndexError as e:
print("Failed to index documents:")
for err in e.errors:
print(err)
Output:
Index exists
Data successfully indexed in the destination index.
The bulk()
method takes the Elasticsearch client instance (es), the list of actions (actions), and the index name. It returns a response containing information about the bulk operation.
Using query and the count()
in Elasticsearch:
query = {
"match_all": {}
}
es.count(index=py_indexname, query=query)
Output:
ObjectApiResponse({'count': 0, '_shards': {'total': 1, 'successful':
1, 'skipped': 0, 'failed': 0}})
Search query:
# Execute the search query
response = es.search(index=py_indexname, query=query, size=100) # Extract the results
results = response["hits"]["hits"]
# Print the documents
for result in results:
print(result["_source"])
We can also see our data in Elasticsearch Kibana “Discover” tab:
You can see here 53 hits, because my data was 53 documents long. In the example data provided by me above, you suppose to see 5 documents.
Aggregations
ES supports aggregations to perform analytics and gather insights from the data. Library provides functions to build and execute aggregations.
from elasticsearch import Elasticsearch
from elasticsearch.helpers import bulk, BulkIndexError
config = {
"ES_USER": "elastic",
"ES_PASS": "password",
}
# Create an index
indexname = 'prices'
es.indices.create(index=indexname)
# Check if an index exists
if es.indices.exists(index=indexname):
print("Index exists")
else:
print("Index doesn't exist.")
actions = [
{"_index": indexname,
"_id": i,
"_source": {
"product": "example" + str(i),
"price": 3.20 + i*0.75}
}
for i in range(20)
]
try:
bulk(es, actions)
print("Data successfully indexed in the destination index.")
except BulkIndexError as e:
print("Failed to index documents:")
for err in e.errors:
print(err)
query = {
"match_all": {}
}
es.count(index=indexname, query=query)
# Execute the search query
response = es.search(index=indexname, query=query, size=20)
# Extract the results
results = response["hits"]["hits"]
# Print the documents
for result in results:
print(result["_source"])
aggregation_query = {
"aggs": {
"avg_price": {
"avg": {
"field": "price"
}
}
}
}
# Execute the aggregation query
response = es.search(index=indexname, body=aggregation_query)
# Get the average price from the response
avg_price = response['aggregations']['avg_price']['value']
print(f"Average Price: {round(avg_price,2)}")
Output:
Average Price: 10.32
Thank you!
I hope you found this article on ElasticSearch and Python helpful and insightful. If you have any questions, suggestions, or just want to connect, feel free to reach out to me on LinkedIn!
Let’s continue the conversation, collaborate, and stay connected!