Scheduled Job Queue for Telegram Bots
Outsource your job scheduler to Telegram for automating periodic tasks.
We’ll have a look at the JobQueue feature in Telegram by creating a bot that periodically checks a website and sends notifications to Telegram users or groups when new content is detected.
Here are the steps we’re following:
- Get a Telegram API key from the Telegram BotFather.
- Install the Telegram package using pip.
- Use the Requests library functions to keep track of changes in a webpage’s content.
- Schedule recurring tasks using the Telegram JobQueue feature, such as polling at specific intervals.
- Send a notification message to the user via the Telegram API uwhen a change in the webpage content is detected.
Hold on … outsourcing your task scheduler?
There are numerous packages in Python that provide features for scheduling and executing tasks.
Some of the popular ones include Celery, APScheduler and Schedule.
These packages allow you to run tasks at specific times or intervals, monitor task status, and retry failed tasks.
However, they don’t integrate directly with the Telegram messaging platform like Telegram JobQueue does.
If you’re looking to schedule tasks specifically for your Telegram bot, the Telegram JobQueue is likely your best bet.
Here’s a quick overview of each package:
- APScheduler: is a lightweight task scheduling library that allows you to schedule and run functions at specified intervals. It supports several types of triggers including date-based and interval-based triggers and provides a variety of scheduling options such as running tasks in a separate thread or process.
- Schedule: Schedule is a simple and lightweight task scheduling library that allows you to schedule functions to run at specified intervals. It has a very simple API and doesn’t require any external dependencies. Schedule is often used for running simple tasks such as sending notifications or performing system maintenance tasks.
- Celery: Celery is a widely used task queue implementation in Python. It allows you to distribute tasks across worker nodes and execute them asynchronously. Celery supports a variety of brokers such as RabbitMQ, Redis, and Amazon SQS, and provides tools for monitoring and scaling your tasks. It also supports task result storage and task retrying.
Scraping websites, a Beautiful Soup …
The code we’ve got here is pretty straightforward. It will send an alert message for even the slightest change on a webpage. While this may work for some cases, it may not be suitable for monitoring larger websites or frequent changes.
For those who want to monitor website changes more efficiently, there are alternative tools like Beautiful Soup.
This tool can scrape the HTML content of a webpage and extract specific information, like the title, to compare it against the previous version. This approach can be more effective in detecting significant changes, rather than triggering an alert message for every minor change.
If you’re monitoring RSS feeds, the current code can still be useful since changes are less frequent.
Understanding Telegram’s JobQueue
""" Running every 10 seconds """
context.job_queue.run_repeating(a_repeating_job, interval=10, first=0)
The job_queue.run_repeating()
allows us to schedule recurring tasks at specified intervals. This function takes two main arguments: callback
, which is the function that should be executed at each interval, and interval
, which is the time interval (in seconds) between each execution of the callback
function.
One feature of job_queue.run_repeating()
is that it accepts a first
argument, which specifies the time delay (in seconds) before the first execution of the callback
function. This can be useful if you want to delay the initial execution of the task for some reason.
The function returns an instance of the telegram.ext.Job
class, which can be used to manipulate the scheduled task. For example, the job.schedule_removal()
method can be used to remove a specific task from the job queue.
""" Running on Mon, Tue, Wed, Thu, Fri = tuple(range(5)) at 10:00 UTC time """
t = datetime.time(hour=10, minute=00, second=00)
context.job_queue.run_daily(a_repeating_job, t, days=tuple(range(5)))
Telegram’s job queue function also includes the options of run_daily
and run_once
.
Here we can schedule tasks to run at specific times each day or to run only once at a specified time.
With run_daily
, the callback function will be executed every day at the specified time, while run_once
will execute the callback function only once at the specified time.
How to get a Telegram API key
If you want to get a Telegram API key, you have to create a Telegram bot.
Here are the steps you need to follow:
- Open the Telegram app on your device or go to the Telegram website and log in.
- Search for the “BotFather” bot and start a chat with it.
- Type “/newbot” and follow the instructions to create a new bot. You will need to give your bot a name and a username.
- When your bot is created, the BotFather will give you a token.
This token serves as your Telegram API key. - Copy the API key and save it in a safe place.
Note that the API key should be kept private. It allows anyone with access to it to send messages on behalf of your bot.
Installing the Telegram package using pip
If you want to interact with Telegram bots, you can use the python-telegram-bot
package. This package provides an interface to the Telegram Bot API.
To get started, you can use the following command in the terminal:
pip3 install python-telegram-bot
Setting up our Telegram bot
Let’s get started by creating a file called ‘telegramjobque.py’ and importing the necessary modules.
We’ll also create a main() function to handle two commands, ‘/start_check’ and ‘/stop_check’.
So, without further ado, let’s dive right in!
telegramjobque.py file
In your code, import:
import requests
from telegram import Update
from telegram.ext import Updater, CommandHandler, CallbackContext
The requests
module is a third-party package that allows users to send HTTP/1.1 requests to websites and obtain data from them. It is commonly used for web scraping and accessing web APIs.
The Update
class is a part of the telegram
package that represents an incoming update from Telegram, such as a new message or an edited message.
The Updater
class is a part of the telegram.ext
package, which is an extension of the telegram
package. The Updater
class is used to continuously fetch updates from the Telegram server, allowing the bot to receive and respond to messages in real-time.
The CommandHandler
class is used to handle user commands in the bot. It takes in the name of the command (including the '/' prefix) and the function that should be executed when the command is called.
The CallbackContext
class is used to store the context of the callback function that is executed when a command or message is received by the bot. This context can include information about the update, the bot, and any custom data that has been added to the context.
This code defines the main()
function that sets up a Telegram bot using the python-telegram-bot
package.
def main():
"""Main function to start the bot and handle commands"""
# Create the Updater and pass it your bot's token.
updater = Updater("<<your_telegram_api_key>>")
# Get the dispatcher to register handlers
dispatcher = updater.dispatcher
# Add command handlers for /start_check and /stop_check
dispatcher.add_handler(CommandHandler("start_check", start_check_command))
dispatcher.add_handler(CommandHandler("stop_check", stop_check_command))
# Start the Bot
updater.start_polling()
# Run the bot until you press Ctrl-C or the process receives SIGINT,
# SIGTERM or SIGABRT. This should be used most of the time, since
# start_polling() is non-blocking and will stop the bot gracefully.
updater.idle()
if __name__ == '__main__':
main()
Here’s a breakdown of what the code does:
- It creates an instance of the
Updater
class, passing in the Telegram API key. - It gets the
dispatcher
object from theUpdater
instance, which is used to register handlers for commands. - It registers two command handlers for
/start_check
and/stop_check
, respectively. These handlers are functions that will be called when the corresponding commands are issued to the bot. - It starts the bot using the
start_polling()
method of theUpdater
instance. This method starts a long polling loop that will listen for incoming messages and handle them appropriately. - It calls
updater.idle()
to keep the program running until the user manually stops the program (e.g. by pressing Ctrl-C).
Overall, the code sets up a Telegram bot that can handle two commands:/start_check
and /stop_check
.
When the /start_check
command is issued, the start_check_command()
function will be called.
Similarly, when the /stop_check
command is issued, the stop_check_command()
function will be called.
— — — — — — — — — — —
Picard: “Data, we need to talk. We’ve decided to replace you with a robot.”
Data: “I don’t understand, sir. I am a robot.”
Picard: “Yes, but we found one that’s cheaper and doesn’t quote Shakespeare all the time."
— — — — — — — — — — —
Creating a repeating polling job for a Telegram bot
Here we set up a Telegram bot to check for changes on a website on a regular basis.
The start_check_command()
function sets up a job that polls the website every 10 seconds and saves the job details in chat_data. If a job is already running, the user will be notified.
The stop_check_command()
function stops the current job when it is executed, removing it from the job queue and chat_data.
The RSS feed https://medium.com/feed/@steve.dua is checked by default, but you have the option to instruct the bot to verify a different URL.
The start_check_command()
function that sets up the polling interval.
def start_check_command(update: Update, context: CallbackContext) -> None:
"""Handler function for the /start_check command"""
# Use the global variables
global previous_content
global webpage_url
polling_interval = 10
# Get the polling job from the chat_data
job = context.chat_data.get('polling_job')
if job:
# If a polling job is already running, send a message to notify the users
update.message.reply_text('A polling job is already running!')
else:
# Set the previous_content to an empty string and set the webpage_url to the input argument if it is a valid URL
previous_content = ""
webpage_url = context.args[0] if len(context.args) > 0 and is_url_valid(context.args[0]) else "https://medium.com/feed/@steve.dua"
# Start the repeating job to check the website content every 10 seconds
job = context.job_queue.run_repeating(send_message_when_website_content_has_changed, interval=polling_interval, first=0, context=update.message.chat_id)
# Store the job in the chat_data for later reference
context.chat_data['polling_job'] = job
# Send a message to confirm the polling job has started
update.message.reply_text(f'Polling job every {polling_interval} seconds for the site \n{webpage_url} \nhas started!')
This code defines the function start_check_command
, which is a handler function for the /start_check
command in a Telegram bot.
The function starts by declaring two global variables, previous_content
and webpage_url
.
Then, it sets a polling interval of 10 seconds and tries to retrieve the polling job from the chat_data. If a job is already running, the function replies with a message notifying the user. Otherwise, it sets the previous_content
to an empty string and sets the webpage_url
to the input argument if it is a valid URL; otherwise, it sets it to a default URL.
The is_url_valid()
function checks if the URL is valid or not.
def is_url_valid(url_to_check):
try:
# Send a GET request to the URL and return True if the response status code is 200
return requests.get(url_to_check).status_code == 200
except:
# If there is an error retrieving the URL, return False
return False
After that, it starts a repeating job using the run_repeating
method of job_queue
to check the website content every 10 seconds. The job is defined by the function send_message_when_website_content_has_changed
, which we will discuss later in this article.
The function stores the job in the chat_data for later reference and sends a message to confirm that the polling job has started, including the polling interval and the URL being checked.
The stop_check_command()
function to handle the /stop_check
command.
def stop_check_command(update: Update, context: CallbackContext) -> None:
"""Handler function for the /stop_check command"""
job = context.chat_data.get('polling_job')
# Cancel the job if it exists
if job:
job.schedule_removal() # remove the repeating job from the job queue
del context.chat_data['polling_job'] # remove the job from the context
update.message.reply_text('Polling job has stopped!') # send a message to confirm the polling job has stopped
else:
update.message.reply_text('There is no active polling job!') # send a message indicating that there is no active job
This is a handler function for the /stop_check
command.
When this command is executed, it will stop the current polling job that is checking for website changes.
The function starts by getting the current job from the chat data.
If the job exists, it cancels the job using job.schedule_removal()
, removes the job from the context with del context.chat_data['polling_job']
, and sends a message to confirm that the polling job has stopped with update.message.reply_text('Polling job has stopped!')
.
If there is no active polling job, the function sends a message indicating that there is no active job with update.message.reply_text('There is no active polling job!')
.
Monitor website content changes
This function uses a global variable to keep track of the previous content of the website, and sends a GET request to the website to retrieve the current content. It then compares the current content to the previous content to see if there has been any changes since the last check.
If changes are detected, the function sends a message to the chat to let users know about the update.
If there’s an error retrieving the website, the function sends an error message to the chat instead.
The reason for repeatedly calling this function is to keep the chat updated with any changes in the website’s content.
def send_message_when_website_content_has_changed(context: CallbackContext):
# Use the global variable
global previous_content
# Send a GET request to the website
try:
response = requests.get(webpage_url)
# Check if the response status code is 200 (success)
if response.status_code == 200:
current_content = response.text
# Check if the content has changed since the last poll
if previous_content != current_content:
# If the content has changed, send a message to notify the users
if previous_content != "":
context.bot.send_message(chat_id=context.job.context, text=f'The website content \n{webpage_url} \n has changed!')
# Update the chat_data with the new content
previous_content = current_content
except:
# If there is an error retrieving the website, send an error message
context.bot.send_message(chat_id=context.job.context, text= f'Error retrieving {webpage_url}')
This is a function that is called repeatedly every 10 seconds by a job created by the /start_check
command. The purpose of this function is to send a message to the chat when the content of a website specified by the user changes.
The function first defines previous_content
as a global variable to keep track of the previous content of the website. It then sends a GET request to the website specified by the webpage_url
global variable to retrieve the current content.
If the response status code is 200 (which means the GET request was successful), the function compares the current content to the previous content to see if there has been any changes since the last poll.
If the content has changed, the function sends a message to the chat to notify the users.
Finally, if there is an error retrieving the website, the function sends an error message to the chat.
python3 telegramjobque.py
output:
Heroku
To deploy your robot on Heroku, you’ll need to make changes to the bot to utilize polling instead of webhooks for retrieving new data.
Fortunately, this is easily accomplished by making some minor adjustments to the code.
A helpful resource for implementing these changes can be found here.
Conclusion
Telegram Job Queue is a cool tool that can help you schedule repeating tasks and send instant notifications to you, users or groups.
It’s user-friendly and can be set up with only a few lines of code.
Plus, it’s perfect for scheduling tasks specifically for Telegram bots.
But if you need more advanced functionalities for task execution, there are other Python packages available like Celery, APScheduler, and Schedule.
Thanks for the read!
References
- “APScheduler Documentation” by R. Silva, B. Burnes, et al.
Available at: https://apscheduler.readthedocs.io/en/stable/ - “Schedule Documentation” by Seth M. Morton.
Available at: https://schedule.readthedocs.io/en/stable/ - “Celery Documentation” by Celery Contributors.
Available at: https://docs.celeryproject.org/en/stable/index.html - “BeautifulSoup” by Leonard Richardson.
Available at: https://www.crummy.com/software/BeautifulSoup/bs4/doc/ - How to Deploy a Telegram Bot using Heroku by Haohui.
Available at: How to Deploy a Telegram Bot using Heroku - “Telegram Bot API” by Telegram.
Available at: https://core.telegram.org/bots/api - “python-telegram-bot Documentation” by python-telegram-bot contributors.
Available at: https://python-telegram-bot.readthedocs.io/en/stable/
Congratulations, you made it to the end!
Get the code on Github here:
Spot Binance API:
https://github.com/stevebelgium/telegramjobque
If you enjoyed this, please clap and follow me on Medium