Creating a Scheduler Microservice for Task Management.
In our project, we manage tasks created by users and assign them to responsible individuals, primarily focusing on the restaurant business. However, our application is designed to be adaptable to other industries as well.
Initially, we developed basic task creation and assignment functionalities. Over time, we enhanced these features to include:
- Restricting task completion to specific geolocations.
- Allowing users to attach files when creating tasks.
- Allowing users to upload files upon task completion.
- Adding a comments section to facilitate user collaboration and task iteration.
- Storing task data as templates for easy task detail filling in the future.
Recently, we expanded our system to include task scheduling capabilities. Users can now schedule tasks for future creation and set up recurring schedules. Instead of creating a tightly integrated scheduling system, we opted to build a separate microservice to handle these requirements. This approach ensures that the scheduler can be utilized by any module within our application.
Our scheduler microservice is built using Node.js (Express), AWS SQS, and MongoDB (choice of no-sql db is evident from the fact that data is unstructured and we need fast & large number of writes). This architecture allows us to efficiently manage task schedules while maintaining flexibility for future enhancements and industry adaptations.
Service components
In our system, we’ve divided the scheduling functionality into several components, each with a specific role:
- Scheduler Service: This component handles basic schedule validation. If a schedule passes validation, it inserts the schedule into Queue 1 and responds to the caller with a success or failure message.
- Processor Worker: Responsible for processing messages from Queue 1. Here, custom business logic is applied, and calculations for the next schedule time are made. The processed data is then inserted into MongoDB for storage and future reference.
- Polling Worker: Operating on a 5-minute interval, this worker queries the MongoDB database to retrieve all schedules due in the next 5 minutes. It then inserts these records into Queue 2 and updates the nextSchedule field in the database according to the specified repeat interval.
- Task Execution Worker: This component pulls messages from Queue 2 and schedules a timeout for each message based on the scheduled execution time. Upon timeout, the worker executes a task, which in this case involves making an API call.
scheduler service
The scheduler service exposes four basic endpoints to manage schedules:
- POST /scheduler-service/v1/schedule: Used to create a new schedule. The request body contains all the necessary data for preparing and executing the task, such as API calls, batch processes, or notifications. Each schedule is assigned a unique UUID for tracking purposes.
- PUT /scheduler-service/v1/schedule/:scheduleId: Updates an existing schedule identified by its unique ID.
- GET /scheduler-service/v1/schedule/:scheduleId: Retrieves details of a specific schedule.
- DELETE /scheduler-service/v1/schedule/:scheduleId: Deletes a schedule identified by its unique ID.
In this blog post, we’ll focus on the POST endpoint, which plays a crucial role in initiating new schedules and managing task execution in our system.
{
"name": "testing",
"description": "testing the schedule functionality",
"type": "API",
"schedule": {
"start_date": "05/02/2024", // MM-DD-YYYY
"end_date": "05/04/2024", // MM-DD-YYYY
"timeZone": "Asia/Calcutta",
"time_of_day":"11:53",
"am_pm":"AM",
"repeat_interval": {
"number": 2,
"unit": "minute"
}
},
"payload": {
"method": "PUT",
"url": "https://www.xyz.com/resconstruct",
"headers": {
"header1": "Basic fdastvhjasbc"
},
"queryParameters": {
"param1": "121",
"param2": "121"
},
"body": {
"taskName": "demo task",
"taskDescription": "testing a task creation",
"checklistLabels": [],
"checklistData": [],
"dueDate": "2024-03-17T12:00:00Z",
"private": false,
"isSignatureRequired": true,
"geoFenceEnabled": true,
"geoFenceLatitude": 17.8967,
"geoFenceLongitude": 88.789,
"geoFenceRadius": 10,
"geoFenceRadiusUnit": "meters",
"attachments": [],
"priority": "low",
"businessID": 6,
"userTags": [
{
"targetUserID": 3,
"taskUserType": "assigned"
},
{
"targetUserID": 7,
"taskUserType": "mentioned"
}
],
"locationTags": [
{
"locationID": 2
}
],
"departmentTags": [
{
"departmentID": null,
"departmentName": "floor-cleaning"
}
]
}
},
"metadata": {
"created_by": "45667",
"role": "admin",
}
}
Handling Timezones in Scheduler Service: Ensuring Robustness
To ensure our scheduler service is robust and can handle different local timezones, we leverage JavaScript’s
Intl
global object, which provides a list of all valid timezones. Additionally, we extensively use the Luxon package for managing timezones.By incorporating these tools into our service, we can confidently manage schedules across various timezones, ensuring accurate task execution and reliable scheduling functionality.
Intl.supportedValuesOf("timeZone")
processor worker
The message body used in the below code is the json object in the scheduler service.
async function processMessage(message) {
try {
//----------------------------------------------------------------
//process the message and prepare schedule document
//----------------------------------------------------------------
let messageBody = JSON.parse(message.Body);
const { schedule, ...rest } = messageBody;
const scheduleDocument = { ...rest };
const {
start_date,
end_date,
timeZone,
time_of_day,
am_pm,
repeat_interval,
} = schedule;
const {
year: startYear,
month: startMonth,
day: startDay,
} = extractDateComponents(start_date, "MM/dd/yyyy");
const {
year: endYear,
month: endMonth,
day: endDay,
} = extractDateComponents(end_date, "MM/dd/yyyy");
const { hour, minute } = convertTo24HourFormat(time_of_day + " " + am_pm);
scheduleDocument.start_date = new Date(
new DateTimeWithZone(
startYear,
startMonth,
startDay,
0,
0,
timeZone
).getUTCString()
);
scheduleDocument.end_date = new Date(
new DateTimeWithZone(
endYear,
endMonth,
endDay,
23,
59,
timeZone
).getUTCString()
);
scheduleDocument.next_schedule_time = new Date(
new DateTimeWithZone(
startYear,
startMonth,
startDay,
hour,
minute,
timeZone
).getUTCString()
);
scheduleDocument.no_of_fails = 0;
scheduleDocument.repeat_interval = repeat_interval;
//----------------------------------------------------------------
//insert schedule document in monogDB
//----------------------------------------------------------------
await insertSchedule(
CONSTANTS.dbURI,
CONSTANTS.dbName,
CONSTANTS.dbCollectionName,
scheduleDocument
);
} catch (err) {
console.log(err);
// log the message in the queue for debugging
} finally {
//----------------------------------------------------------------
//delete message from the queue 1
//----------------------------------------------------------------
await deleteMessagefromQueue(CONSTANTS.QueueUrl1, message.ReceiptHandle);
}
}
polling worker
const { DateTime } = require("luxon");
const SQS = require("../../sqsObject");
const { connect, getDatabase } = require("../dataBase/db");
const CONSTANTS = require("../../constants");
const {
generateUtcTimestampWithOffset,
calcMillisecsRepeatIntv,
} = require("../date-time-utils/utils");
let sqsObject = new SQS(
CONSTANTS.endpoint,
CONSTANTS.accessKeyId,
CONSTANTS.secretAccessKey,
CONSTANTS.region
);
async function fetchSchedules(dbURI, dbName, dbCollectionName) {
const database = await getDatabase(dbURI, dbName);
const collection = database.collection(dbCollectionName);
const currentTime = DateTime.utc();
const fiveMinutesLater = currentTime.plus({ minutes: 5 });
const query = {
start_date: { $lte: currentTime.toJSDate() },
end_date: { $gte: currentTime.toJSDate() },
next_schedule_time: {
$gte: currentTime.toJSDate(),
$lte: fiveMinutesLater.toJSDate(),
},
};
const result = await collection.find(query).toArray();
console.log(result);
return result;
}
async function updateSchedule(dbURI, dbName, dbCollectionName, schedule) {
const database = await getDatabase(dbURI, dbName);
const collection = database.collection(dbCollectionName);
const result = await collection.updateOne(
{ _id: schedule._id },
{ $set: { next_schedule_time: schedule.next_schedule_time } }
);
console.log(`Updated ${result.modifiedCount} document`);
}
setInterval(async () => {
// pick probable records from database
let schedules = await fetchSchedules(
CONSTANTS.dbURI,
CONSTANTS.dbName,
CONSTANTS.dbCollectionName
);
// drop them in the queue
schedules.forEach(async (schedule) => {
await sqsObject.sendMessage(JSON.stringify(schedule), CONSTANTS.QueueUrl2);
});
// change the next schedule date and update the db records
schedules.forEach(async (schedule) => {
schedule.next_schedule_time = new Date(
generateUtcTimestampWithOffset(
new Date(schedule.next_schedule_time).toISOString(),
calcMillisecsRepeatIntv(
schedule.repeat_interval.unit,
schedule.repeat_interval.number
)
)
);
await updateSchedule(
CONSTANTS.dbURI,
CONSTANTS.dbName,
CONSTANTS.dbCollectionName,
schedule
);
});
}, 60 * 1000);
Helper functions
const { DateTime, Interval } = require("luxon");
const isValidDateFormat = (dateString, format) => {
try {
let dt = DateTime.fromFormat(dateString, format);
if (!dt.isValid) {
throw new Error("Invalid date format");
}
return true;
} catch (error) {
return false;
}
};
const extractDateComponents = (dateString, format) => {
const dt = DateTime.fromFormat(dateString, format);
if (!dt.isValid) {
throw new Error("Invalid date format");
}
return {
year: dt.year,
month: dt.month,
day: dt.day,
};
};
/**
* @param {string} timeString looks like "09:30 AM"
*/
const convertTo24HourFormat = (timeString) => {
const time = DateTime.fromFormat(timeString, "h:mm a");
if (!time.isValid) {
throw new Error("Invalid time format");
}
const formattedTime = time.toFormat("HH:mm");
const hour = time.hour.toString().padStart(2, "0");
const minute = time.minute.toString().padStart(2, "0");
return { hour, minute, formattedTime };
};
const calcMillisecsRepeatIntv = (repeatIntervalUnit, repeatIntervalNumber) => {
const now = DateTime.utc(); // Get the current time in UTC
const end = now.plus({ [repeatIntervalUnit]: repeatIntervalNumber }); // Calculate the end time
// Create an interval between now and the calculated end time
const interval = Interval.fromDateTimes(now, end);
// Return the length of the interval in milliseconds
return interval.length("milliseconds");
};
const generateUtcTimestampWithOffset = (utcTimestamp, milliseconds) => {
console.log(utcTimestamp, milliseconds);
const dt = DateTime.fromISO(utcTimestamp, { zone: "utc" });
if (!dt.isValid) {
throw new Error("Invalid UTC timestamp format");
}
const newDt = DateTime.fromMillis(dt.toMillis() + milliseconds, {
zone: "utc",
});
return newDt.toUTC().toString();
};
Alternative Approach: Single Queue for Simplified Scheduling
For smaller-scale applications with fewer requests, an alternative approach using a single queue can be considered. In this model, the scheduler service receives requests, processes them, inserts them into MongoDB, and responds with relevant status, all through a single queue.
While this approach simplifies the architecture and is suitable for low request volumes, it may introduce latency and scalability challenges as the traffic grows. Throttling requests becomes increasingly important as the service’s load increases.
Here is a design diagram illustrating the single queue approach:
This alternative approach offers simplicity but may require reevaluation and adjustment as the application’s traffic and scheduling requirements evolve.
Conclusion
In this blog post, we’ve explored how to create a robust and flexible scheduler microservice using Node.js (Express), AWS SQS, and MongoDB. Our microservice is designed to manage future task scheduling and recurring schedules efficiently, making it adaptable to various industries beyond the restaurant business.
We’ve walked through the architecture, key components, and code snippets, focusing on the POST endpoint for creating schedules. By leveraging JavaScript’s Intl global object and the Luxon package, our service can handle different timezones accurately, ensuring reliable task execution.
To simplify the process of implementing similar scheduling functionalities in your own projects, I’ve created an npm package called @tasklyhub/scheduler. This package encapsulates the core scheduling logic and can be easily integrated into your applications.
Check out the @tasklyhub/scheduler npm package for a ready-to-use implementation:
npm install @tasklyhub/scheduler
You can find the package here.
Happy scheduling!