Scheduled Job Queue for Telegram Bots

Outsource your job scheduler to Telegram for automating periodic tasks.

Steve Dua
11 min readApr 3, 2023
Image generated on Midjourney. Prompt: A production line with robots made of paper
Image generated on Midjourney. Prompt: A production line with robots made of paper

We’ll have a look at the JobQueue feature in Telegram by creating a bot that periodically checks a website and sends notifications to Telegram users or groups when new content is detected.

Here are the steps we’re following:

  • Get a Telegram API key from the Telegram BotFather.
  • Install the Telegram package using pip.
  • Use the Requests library functions to keep track of changes in a webpage’s content.
  • Schedule recurring tasks using the Telegram JobQueue feature, such as polling at specific intervals.
  • Send a notification message to the user via the Telegram API uwhen a change in the webpage content is detected.

Hold on … outsourcing your task scheduler?

There are numerous packages in Python that provide features for scheduling and executing tasks.
Some of the popular ones include Celery, APScheduler and Schedule.
These packages allow you to run tasks at specific times or intervals, monitor task status, and retry failed tasks.
However, they don’t integrate directly with the Telegram messaging platform like Telegram JobQueue does.
If you’re looking to schedule tasks specifically for your Telegram bot, the Telegram JobQueue is likely your best bet.

Here’s a quick overview of each package:

  • APScheduler: is a lightweight task scheduling library that allows you to schedule and run functions at specified intervals. It supports several types of triggers including date-based and interval-based triggers and provides a variety of scheduling options such as running tasks in a separate thread or process.
  • Schedule: Schedule is a simple and lightweight task scheduling library that allows you to schedule functions to run at specified intervals. It has a very simple API and doesn’t require any external dependencies. Schedule is often used for running simple tasks such as sending notifications or performing system maintenance tasks.
  • Celery: Celery is a widely used task queue implementation in Python. It allows you to distribute tasks across worker nodes and execute them asynchronously. Celery supports a variety of brokers such as RabbitMQ, Redis, and Amazon SQS, and provides tools for monitoring and scaling your tasks. It also supports task result storage and task retrying.

Scraping websites, a Beautiful Soup …

The code we’ve got here is pretty straightforward. It will send an alert message for even the slightest change on a webpage. While this may work for some cases, it may not be suitable for monitoring larger websites or frequent changes.

For those who want to monitor website changes more efficiently, there are alternative tools like Beautiful Soup.
This tool can scrape the HTML content of a webpage and extract specific information, like the title, to compare it against the previous version. This approach can be more effective in detecting significant changes, rather than triggering an alert message for every minor change.

If you’re monitoring RSS feeds, the current code can still be useful since changes are less frequent.

Understanding Telegram’s JobQueue

""" Running every 10 seconds """
context.job_queue.run_repeating(a_repeating_job, interval=10, first=0)

The job_queue.run_repeating() allows us to schedule recurring tasks at specified intervals. This function takes two main arguments: callback, which is the function that should be executed at each interval, and interval, which is the time interval (in seconds) between each execution of the callback function.

One feature of job_queue.run_repeating() is that it accepts a first argument, which specifies the time delay (in seconds) before the first execution of the callback function. This can be useful if you want to delay the initial execution of the task for some reason.
The function returns an instance of the telegram.ext.Job class, which can be used to manipulate the scheduled task. For example, the job.schedule_removal() method can be used to remove a specific task from the job queue.

""" Running on Mon, Tue, Wed, Thu, Fri = tuple(range(5)) at 10:00 UTC time """
t = datetime.time(hour=10, minute=00, second=00)
context.job_queue.run_daily(a_repeating_job, t, days=tuple(range(5)))

Telegram’s job queue function also includes the options of run_daily and run_once.
Here we can schedule tasks to run at specific times each day or to run only once at a specified time.
With run_daily, the callback function will be executed every day at the specified time, while run_once will execute the callback function only once at the specified time.

How to get a Telegram API key

If you want to get a Telegram API key, you have to create a Telegram bot.
Here are the steps you need to follow:

  1. Open the Telegram app on your device or go to the Telegram website and log in.
  2. Search for the “BotFather” bot and start a chat with it.
  3. Type “/newbot” and follow the instructions to create a new bot. You will need to give your bot a name and a username.
  4. When your bot is created, the BotFather will give you a token.
    This token serves as your Telegram API key.
  5. Copy the API key and save it in a safe place.
Screenshot BodFather Telegram Bot
BodFather Telegram Bot

Note that the API key should be kept private. It allows anyone with access to it to send messages on behalf of your bot.

Installing the Telegram package using pip

If you want to interact with Telegram bots, you can use the python-telegram-bot package. This package provides an interface to the Telegram Bot API.

To get started, you can use the following command in the terminal:

pip3 install python-telegram-bot

Setting up our Telegram bot

Let’s get started by creating a file called ‘telegramjobque.py’ and importing the necessary modules.
We’ll also create a main() function to handle two commands, ‘/start_check’ and ‘/stop_check’.
So, without further ado, let’s dive right in!

telegramjobque.py file

In your code, import:

import requests
from telegram import Update
from telegram.ext import Updater, CommandHandler, CallbackContext

The requests module is a third-party package that allows users to send HTTP/1.1 requests to websites and obtain data from them. It is commonly used for web scraping and accessing web APIs.

The Update class is a part of the telegram package that represents an incoming update from Telegram, such as a new message or an edited message.

The Updater class is a part of the telegram.ext package, which is an extension of the telegram package. The Updater class is used to continuously fetch updates from the Telegram server, allowing the bot to receive and respond to messages in real-time.

The CommandHandler class is used to handle user commands in the bot. It takes in the name of the command (including the '/' prefix) and the function that should be executed when the command is called.

The CallbackContext class is used to store the context of the callback function that is executed when a command or message is received by the bot. This context can include information about the update, the bot, and any custom data that has been added to the context.

This code defines the main() function that sets up a Telegram bot using the python-telegram-bot package.

def main():
"""Main function to start the bot and handle commands"""

# Create the Updater and pass it your bot's token.
updater = Updater("<<your_telegram_api_key>>")

# Get the dispatcher to register handlers
dispatcher = updater.dispatcher

# Add command handlers for /start_check and /stop_check
dispatcher.add_handler(CommandHandler("start_check", start_check_command))
dispatcher.add_handler(CommandHandler("stop_check", stop_check_command))

# Start the Bot
updater.start_polling()

# Run the bot until you press Ctrl-C or the process receives SIGINT,
# SIGTERM or SIGABRT. This should be used most of the time, since
# start_polling() is non-blocking and will stop the bot gracefully.
updater.idle()

if __name__ == '__main__':
main()

Here’s a breakdown of what the code does:

  • It creates an instance of the Updater class, passing in the Telegram API key.
  • It gets the dispatcher object from the Updater instance, which is used to register handlers for commands.
  • It registers two command handlers for /start_check and /stop_check, respectively. These handlers are functions that will be called when the corresponding commands are issued to the bot.
  • It starts the bot using the start_polling() method of the Updater instance. This method starts a long polling loop that will listen for incoming messages and handle them appropriately.
  • It calls updater.idle() to keep the program running until the user manually stops the program (e.g. by pressing Ctrl-C).

Overall, the code sets up a Telegram bot that can handle two commands:
/start_check and /stop_check.
When the /start_check command is issued, the start_check_command() function will be called.
Similarly, when the /stop_check command is issued, the stop_check_command() function will be called.

— — — — — — — — — — —
Picard: “Data, we need to talk. We’ve decided to replace you with a robot.”
Data: “I don’t understand, sir. I am a robot.”
Picard: “Yes, but we found one that’s cheaper and doesn’t quote Shakespeare all the time."
— — — — — — — — — — —

Creating a repeating polling job for a Telegram bot

Here we set up a Telegram bot to check for changes on a website on a regular basis.

The start_check_command() function sets up a job that polls the website every 10 seconds and saves the job details in chat_data. If a job is already running, the user will be notified.
The stop_check_command() function stops the current job when it is executed, removing it from the job queue and chat_data.

The RSS feed https://medium.com/feed/@steve.dua is checked by default, but you have the option to instruct the bot to verify a different URL.

The start_check_command() function that sets up the polling interval.

def start_check_command(update: Update, context: CallbackContext) -> None:
"""Handler function for the /start_check command"""

# Use the global variables
global previous_content
global webpage_url

polling_interval = 10

# Get the polling job from the chat_data
job = context.chat_data.get('polling_job')

if job:
# If a polling job is already running, send a message to notify the users
update.message.reply_text('A polling job is already running!')
else:
# Set the previous_content to an empty string and set the webpage_url to the input argument if it is a valid URL
previous_content = ""
webpage_url = context.args[0] if len(context.args) > 0 and is_url_valid(context.args[0]) else "https://medium.com/feed/@steve.dua"

# Start the repeating job to check the website content every 10 seconds
job = context.job_queue.run_repeating(send_message_when_website_content_has_changed, interval=polling_interval, first=0, context=update.message.chat_id)

# Store the job in the chat_data for later reference
context.chat_data['polling_job'] = job

# Send a message to confirm the polling job has started
update.message.reply_text(f'Polling job every {polling_interval} seconds for the site \n{webpage_url} \nhas started!')

This code defines the function start_check_command, which is a handler function for the /start_check command in a Telegram bot.

The function starts by declaring two global variables, previous_content and webpage_url.

Then, it sets a polling interval of 10 seconds and tries to retrieve the polling job from the chat_data. If a job is already running, the function replies with a message notifying the user. Otherwise, it sets the previous_content to an empty string and sets the webpage_url to the input argument if it is a valid URL; otherwise, it sets it to a default URL.

The is_url_valid() function checks if the URL is valid or not.

def is_url_valid(url_to_check):
try:
# Send a GET request to the URL and return True if the response status code is 200
return requests.get(url_to_check).status_code == 200
except:
# If there is an error retrieving the URL, return False
return False

After that, it starts a repeating job using the run_repeating method of job_queue to check the website content every 10 seconds. The job is defined by the function send_message_when_website_content_has_changed, which we will discuss later in this article.

The function stores the job in the chat_data for later reference and sends a message to confirm that the polling job has started, including the polling interval and the URL being checked.

The stop_check_command() function to handle the /stop_check command.

def stop_check_command(update: Update, context: CallbackContext) -> None:
"""Handler function for the /stop_check command"""

job = context.chat_data.get('polling_job')

# Cancel the job if it exists
if job:
job.schedule_removal() # remove the repeating job from the job queue
del context.chat_data['polling_job'] # remove the job from the context
update.message.reply_text('Polling job has stopped!') # send a message to confirm the polling job has stopped
else:
update.message.reply_text('There is no active polling job!') # send a message indicating that there is no active job

This is a handler function for the /stop_check command.
When this command is executed, it will stop the current polling job that is checking for website changes.

The function starts by getting the current job from the chat data.
If the job exists, it cancels the job using job.schedule_removal(), removes the job from the context with del context.chat_data['polling_job'], and sends a message to confirm that the polling job has stopped with update.message.reply_text('Polling job has stopped!').

If there is no active polling job, the function sends a message indicating that there is no active job with update.message.reply_text('There is no active polling job!').

Monitor website content changes

This function uses a global variable to keep track of the previous content of the website, and sends a GET request to the website to retrieve the current content. It then compares the current content to the previous content to see if there has been any changes since the last check.
If changes are detected, the function sends a message to the chat to let users know about the update.
If there’s an error retrieving the website, the function sends an error message to the chat instead.

The reason for repeatedly calling this function is to keep the chat updated with any changes in the website’s content.

def send_message_when_website_content_has_changed(context: CallbackContext):

# Use the global variable
global previous_content

# Send a GET request to the website
try:
response = requests.get(webpage_url)

# Check if the response status code is 200 (success)
if response.status_code == 200:
current_content = response.text

# Check if the content has changed since the last poll
if previous_content != current_content:

# If the content has changed, send a message to notify the users
if previous_content != "":
context.bot.send_message(chat_id=context.job.context, text=f'The website content \n{webpage_url} \n has changed!')

# Update the chat_data with the new content
previous_content = current_content

except:
# If there is an error retrieving the website, send an error message
context.bot.send_message(chat_id=context.job.context, text= f'Error retrieving {webpage_url}')

This is a function that is called repeatedly every 10 seconds by a job created by the /start_check command. The purpose of this function is to send a message to the chat when the content of a website specified by the user changes.

The function first defines previous_content as a global variable to keep track of the previous content of the website. It then sends a GET request to the website specified by the webpage_url global variable to retrieve the current content.

If the response status code is 200 (which means the GET request was successful), the function compares the current content to the previous content to see if there has been any changes since the last poll.
If the content has changed, the function sends a message to the chat to notify the users.
Finally, if there is an error retrieving the website, the function sends an error message to the chat.

python3 telegramjobque.py
output:

You can instruct the Bot to verify a different URL

Heroku

To deploy your robot on Heroku, you’ll need to make changes to the bot to utilize polling instead of webhooks for retrieving new data.
Fortunately, this is easily accomplished by making some minor adjustments to the code.
A helpful resource for implementing these changes can be found here.

Conclusion

Telegram Job Queue is a cool tool that can help you schedule repeating tasks and send instant notifications to you, users or groups.
It’s user-friendly and can be set up with only a few lines of code.
Plus, it’s perfect for scheduling tasks specifically for Telegram bots.

But if you need more advanced functionalities for task execution, there are other Python packages available like Celery, APScheduler, and Schedule.

Thanks for the read!

References

Congratulations, you made it to the end!

Get the code on Github here:

Spot Binance API:
https://github.com/stevebelgium/telegramjobque

If you enjoyed this, please clap and follow me on Medium

--

--