From ESP8266 to Telegram Bot
In the realm of data engineering, a data pipeline serves as a vital mechanism for orchestrating the flow of data from various sources to its destination, often involving processing, transformation, and storage along the way. These pipelines come in different flavors, ranging from simple batch processes to complex real-time streaming architectures.
I understand that this information might seem intimidating and complex, but I'll do my best to explain and demonstrate it in a way that's easy to understand. To start, let's consider a use case. Imagine that you have pet a reptile🐍 that is sensitive to changes in temperature and humidity. If you fail to provide the appropriate environment for your pet, it will become stressed, stop eating, and eventually die. Wouldn't it be helpful to have tools that allow you to maintain, monitor, and receive updates on the status of your reptile's environment?
Well..first of all, you need a sensor to receive the environment’s temperature & humidity, then how you access the data? Here’s the step where we have to create a data pipeline from our sensor to a platform that we can interact to easily.
This is a big picture of what I’m going to build, let’s deep dive into more technical parts in the next part. By the way, it may be using tools and methods that are not considered as best practice, but were chosen to provide a foundational understanding of “How it works ?”.
Step 1 : Collecting Data
I’m going to use the ESP8266 with the DHT22 sensor to collect temperature and humidity data because the ESP8266 is a microcontroller with built-in Wi-Fi capabilities, making it ideal for IoT projects that require wireless connectivity. The DHT22 sensor is a reliable and accurate sensor for measuring temperature and humidity. This allows me to collect data from the sensor and send it over Wi-Fi to a service platform for further processing and storage.
After wiring up here and there🔌 , I’m moving to the coding parts, there are several libraries I used within this project :
#include <NTPClient.h> // for timestamp
#include <ESP8266WiFi.h>
#include <WiFiUdp.h>
#include <DHT.h> // connect with DHT Sensor
#include <ESP8266HTTPClient.h>
I’m using the NTPClient library to obtain timestamps. This timestamp ensures that the data is properly labeled with the time of collection. Once the data is collected and timestamped, it is sent over Wi-Fi to a service platform using HTTP Request for further processing.
// Replace with your network credentials
const char* ssid = "YOUR SSID";
const char* password = "YOUR PASSWORD";
// Define NTP Client to get time
WiFiUDP ntpUDP;
NTPClient timeClient(ntpUDP);
// Define the DHT sensor type and pin
#define DHTPIN 4
#define DHTTYPE DHT22
// Initialize DHT sensor
DHT dht(DHTPIN, DHTTYPE);
// Variables to store current temperature and humidity
float t = 0.0f;
float h = 0.0f;
void setup() {
pinMode(LED_BUILTIN, OUTPUT); // Initialize the LED_BUILTIN pin as an output
Serial.begin(115200);
dht.begin();
// Connect to Wi-Fi
WiFi.begin(ssid, password);
while (WiFi.status() != WL_CONNECTED) {
delay(1000);
Serial.println("Connecting to WiFi...");
}
// Initialize the NTP client
timeClient.begin();
Serial.println("Connected to WiFi");
}
void loop() {
digitalWrite(LED_BUILTIN, LOW);
delay(1000);
digitalWrite(LED_BUILTIN, HIGH);
delay(1000);
// Get the current timestamp
String sensorTimestamp;
char timestamp[30];
// Update the NTP client
timeClient.update();
// Get the current time in Unix time
time_t now = timeClient.getEpochTime();
// Convert the current time
strftime(timestamp, sizeof(timestamp), "%Y-%m-%d %H:%M:%S", gmtime(&now));
sensorTimestamp = String(timestamp);
// Get the current temperature and humidity from the DHT sensor
float humidity = dht.readHumidity();
float temperature = dht.readTemperature();
// Create a JSON object to store the sensor data
String jsonData = "{\"sensor_timestamp\":\"" + sensorTimestamp + "\",\"temperature\":" + String(temperature) + ",\"humidity\":" + String(humidity) + "}";
// Print the JSON data to the serial monitor
Serial.println(jsonData);
// Perform an HTTP POST request
if (sendDataToServer(jsonData)) {
Serial.println("Data sent successfully");
} else {
Serial.println("Failed to send data");
}
// Wait for 5 seconds between measurements
delay(5000);
}
bool sendDataToServer(String data) {
// Create a WiFiClient object
WiFiClient client;
// Initialize the HTTPClient object
HTTPClient http;
// Set a timeout value (e.g., 15 seconds)
http.setTimeout(15000); // 15-second timeout
// Begin the HTTP request
http.begin(client, "YOUR URL");
http.addHeader("Content-Type", "application/json");
// Send the HTTP request
int httpCode = http.POST(data);
// Check the HTTP response code
if (httpCode == HTTP_CODE_OK) {
return true;
} else {
Serial.printf("HTTP error code: %d\n", httpCode);
return false;
}
// End the HTTP request
http.end();
}
thenn…voila this is how it looks like now, the device is running and collect the data. Let’s take a smoke break to celebrate this little achievement🚬.
In this case, I’m using Cloud Function, as the destination for my HTTP Request that allowing me to handle the incoming data and process it further. This approach simplifies the development process and allowing me to focus on the application logic.
Step 2: Storing Data
The ideal approach would have been to use the MQTT protocol along Google Cloud IoT Core as a hub and then using Pubsub before trigger the Cloud Function. However, since the service has been retired (IoT Core) I simplified the step by directly trigger the cloud function and stream data into BigQuery using the legacy streaming API.
def insert_data(request):
try:
request_json = request.get_json()
if request_json and 'temperature' in request_json and 'humidity' in request_json and 'sensor_timestamp' in request_json:
temperature = request_json['temperature']
humidity = request_json['humidity']
sensor_timestamp = request_json['sensor_timestamp']
# Generate the current timestamp using datetime.now()
cf_timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
# Generate the date using today's date
job_date = date.today().strftime('%Y-%m-%d')
# Create a BigQuery row to insert
row = {
'temperature': temperature,
'humidity': humidity,
'sensor_timestamp': sensor_timestamp,
'cf_timestamp': cf_timestamp,
'job_date': job_date
}
# Insert the row into BigQuery
table_ref = client.dataset(dataset_id).table(table_id)
client.insert_rows_json(table_ref, [row]
return row
else:
error_message = 'Invalid data format:', request_json
return json.dumps({'error': error_message})
except Exception as e:
error_message = 'An error occurred:', str(e)
return json.dumps({'error': error_message})
here’s the preview of the data that stored to BigQuery :
Step 3: Retrieving Data
Now, we have successfully store to data warehouse, to enable user interact with data within BigQuery, we must first consider that direct access to BigQuery is not preferable. To provide a more user-friendly experience, we can leverage messaging platforms like Telegram by utilizing open-source APIs such as the Telegram Bot API.
Based on the use-case needs, I imagine that upon starting the bot users are greeted with a welcoming message and a command navigator, guiding them through the available options. The “/latest” command allows users to check the most recent ingested data, keeping them informed of the latest readings. For more in-depth analysis, the “/raw” command enables users to request raw data in a format suitable for Excel, with the flexibility to apply optional filters. Additionally, the “/summary” command provides a comprehensive summary of all collected data.
bot = telebot.TeleBot(token)
@bot.message_handler(commands=['start'])
def greet(message):
msg = """Welcome to the testing Telegram bot!
This is a simple bot to help you access IoT data collected by Agim's ESP8266 device equipped with a DHT22 sensor. The sensor gathers data on the environment's temperature and humidity.
Here are the three options you can choose from:
/latest : Check the latest ingested data.
/raw : Request raw data in a format suitable for import into Excel, with optional date filters you can apply.
/summary : Get a summary of all collected data.
I hope this clarifies the bot's functionality!"""
bot.reply_to(message, msg)
# Define a function to handle /latest command
@bot.message_handler(commands=['latest'])
def option_1(message):
# Construct the query
query = """
SELECT
*
FROM `{0}.{1}.{2}`
WHERE job_date = (SELECT MAX(job_date) FROM `{1}.{2}`)
""".format(project_id, dataset, table)
# Execute the query and load data into a DataFrame
df = client.query(query).to_dataframe()
data = df.to_dict(orient='index')
for i in data:
temperature = data[i]['temperature']
humidity = data[i]['humidity']
job_date = data[i]['job_date']
sensor_time = data[i]['sensor_timestamp']
cf_time = data[i]['cf_timestamp']
msg = f"""
Latest Data : \n
Temperature : {temperature},
Humidity : {humidity},
Job Date : {job_date},
Sensor Time : {sensor_time},
Cloud Function Time : {cf_time}
"""
# Send the query results
bot.send_message(message.chat.id, msg)
# Define a function to handle /raw command
@bot.message_handler(commands=['raw'])
def option_2(message):
text = "Specify the minimum date (YYYY-MM-DD format, e.g., 2024-02-06) for data extraction."
send_msg = bot.send_message(message.chat.id, text)
bot.register_next_step_handler(send_msg, raw_data)
def raw_data(message):
filter_date = message.text
# Construct the query
query = """
SELECT
*
FROM `{0}.{1}.{2}`
WHERE job_date >= '{3}'
""".format(project_id, dataset, table, filter_date)
# Execute the query and load data into a DataFrame
data = client.query(query)
df = pd.DataFrame(data)
# Write to Excel using openpyxl engine
writer = pd.ExcelWriter('Raw Data {0}.xlsx'.format(filter_date), engine='openpyxl')
df.to_excel(writer)
writer._save()
attachment = open('Raw Data {0}.xlsx'.format(filter_date), 'rb')
bot.send_document(message.chat.id, attachment)
# Define a function to handle /summary command
@bot.message_handler(commands=['summary'])
def option_3(message):
# Construct the query
query = """
SELECT
AVG(temperature) AS avg_temp,
AVG(humidity) AS humidity,
CAST(MAX(datetime_diff(cf_timestamp, sensor_timestamp, SECOND)) AS STRING) AS max_difference,
CAST(MIN(job_date) AS STRING) AS first_date,
CAST(MAX(job_date) AS STRING) AS last_date
FROM `{0}.{1}.{2}`
""".format(project_id, dataset, table)
# Execute the query and load data into a DataFrame
df = client.query(query).to_dataframe()
data = df.to_dict(orient='index')
for i in data:
avg_temp = data[i]['avg_temp']
humidity = data[i]['humidity']
max_difference = data[i]['max_difference']
first_date = data[i]['first_date']
last_date = data[i]['last_date']
msg = f"""Here's the summary for all ingested data: \n
Average temperature and humidity for the surrounding area are {avg_temp:.1f} degrees Celsius and {humidity:.1f}% humidity. The maximum delay for data to be received from the sensor to the Cloud Function was {max_difference} seconds. Data was collected from {first_date} to {last_date}.
** key metrics **
* _Average Temperature:_ {avg_temp:.1f}°C
* _Average Humidity:_ {humidity:.1f}%
* _Maximum Delay:_ {max_difference} seconds"""
# Send the result
bot.send_message(message.chat.id, msg)
def main():
bot.polling() # looking for message
if __name__ == '__main__':
main()
The code I provided above uses polling to check for new messages from Telegram. Polling involves regularly sending requests to the Telegram servers to see if there are any new messages. While this method is easy to set up, it can be inefficient for high-traffic bots because it sends requests even when there are no new messages.
With the implementation of this Telegram bot, monitoring the temperature and humidity levels of your reptile’s enclosure is now as simple as sending a few commands to the bot & gain instant updates on the conditions within your reptile’s environment.