Automated Job Performing Queue System using Python

I love Python language, its freestyle of coding encourages myself to write solutions to many real time problems. I have been using Python on a daily basis from past 2 years now. I mostly work on tornado framework professionally to build web applications but I am also fascinated to write command line applications using Python as well.

Today, I am going to share one of my coding assignment from Amazon Code Contest in which I have been asked to write an automated job performing queue system for a warehouse management.

Problem Statement: Create an Automated Job Performing Queue System based on jobs priority and employees skills. Employees were given with their skills and Jobs were given with skills to be used to finish them along with their priorities and time to finish as inputs using STDIN. Points which should be taken care were,

  1. I should maintain the order of employees in which they were entered into the system while distributing the Jobs.
  2. As, Multiple employees can have similar skills and multiple jobs can be finished using similar skills so an employee should choose a job relevant to his skills and also which have high priority amongst similar ones.

Approach: To solve this problem, I chose to go with synchronised thread system where threads will represent employees available into the system. I used a Queue data structure from Python which helped me to maintain synchronisation between multiple threads, basically I put all the employees for a specific skills into the Queue at the beginning, pull out employees one by one to allocate jobs to them and push back into the Queue once they complete the given job. To show the busyness of the employees I used time.sleep() which says that the particular employee or thread will go to sleep until time to finish attribute for that job gets over.

Below, I am sharing the code for this problem statement. I have a created a python class named WareHouseManagement which consists all the supporting methods.

import threading
import time
import Queue
class WareHouseManagement(object):
'''
Description:
    An Automated Warehouse system which
consists methods to register
employees and jobs into the system.
It also allocates jobs to respected
skilled employee. It represents employees
as threads and use time.sleep() to show
their busyness. It uses Queue data structure
to create a synchronzied multithread system.
    '''
def __init__(self):
self.track = {}
self.employees = {}
self.jobs = {}
    def store_employees(self, nemps):
'''
Function name: store_employees
        Description:
        Function to register employees into the
warehouse system. It takes employees
information from STDIN and adding it to
employees dict. Idea is to store employees
based on their respective skills.
        Arguments:
        It takes nemps as an inputs which is
total number of employees will be provided
through STDIN.
        '''
ecount = 0
while ecount < nemps:
eid, sid = raw_input().split('#')
try:
self.employees[sid].append(eid)
except KeyError:
self.employees[sid] = [eid]
ecount += 1
    def store_jobs(self, njobs):
'''
Function name: store_jobs
        Description:
        Function to register jobs into the
warehouse system. It takes jobs
information from STDIN and adding it to
jobs dict.Idea is to store jobs
based on their respective skills to finish
them.
        Arguments:
        It takes njobs as an inputs which is
total number of jobs will be provided
through STDIN.
        '''
jcount = 0
while jcount < njobs:
# skill id, priority, time to finish, job id
sid, pri, ttf, jid = raw_input().split('#')
pri, ttf, jid = int(pri), int(ttf), int(jid)
job = {}
job[pri] = {'jid': jid, 'ttf': ttf}
try:
self.jobs[sid].append(job)
except KeyError:
self.jobs[sid] = [job]
jcount += 1
    def perform_job(self, emp, meta, queue):
'''
Function name: perform_job
        Description:
        Function to replicate actual performing
job with time.sleep(). This function does
3 things,
        1. it maintains tracks of
employees which says which job has been done
by which employee. This will going to be consumed
at the end of the program as to show output of
the assignment.
        2. It uses time.sleep() to show that current
employee is working and busy for specific time
period to complete the job.
        3. It adds the employee to back to Queue data structure
once the employee is done with the allocated job so that
in the next iteration new job can be given to the
employee.
        Arguments:
        It takes emp, meta, queue as an inputs which are
employee name, job detail and queue data structure
respectively.
        '''
try:
self.track[emp].append(meta)
except KeyError:
self.track[emp] = [meta]
time.sleep(meta.values()[0]['ttf'])
queue.put(emp)
    def distribute_jobs(self, skill, rjobs):
'''
Function name: distribute_jobs
        Description:
        Function to create a Queue to install
employees into the system for a specific job
as there can be multiple employees for same
skills and same jobs.
        It also creates threads which replicates
employees and distribute jobs to them.
        Arguments:
        It takes skill, rjobs as an inputs which are
skill provided through STDIN and jobs associated
with that skills.
        '''
jobcount = len(rjobs)
jcount = 0
threads = []
        queue = Queue.Queue()
for emp in self.employees[skill]:
queue.put(emp)
       # sorting jobs based on their priority
rjobs = sorted(rjobs, reverse=True)
        while jcount < jobcount:
thread = threading.Thread(target=self.perform_job,
args=(queue.get(timeout=100),
rjobs[jcount],
queue))
threads.append(thread)
thread.start()
jcount += 1
        return threads
    def print_track(self):
'''
Function name: print_track
        Description:
        Function to print track variable
on STDOUT according to the defined
format.
        '''
emps = self.track.keys()
emps.sort()
for emp in emps:
result = emp + '#'
jobs = self.track.get(emp)
for job in jobs:
for meta in job.values():
result += '{}#'.format(meta.get('jid'))
result = result[:len(result) - 1]
print result
def main():
'''
Function name: main
    Descripton:
    Function to start the Warehouse system.
    '''
# total number of employees
nemps = int(raw_input())
warehouse = WareHouseManagement()
warehouse.store_employees(nemps)
    # total number of jobs
njobs = int(raw_input())
warehouse.store_jobs(njobs)
    threads = []
    for skill, rjobs in warehouse.jobs.iteritems():
threads = warehouse.distribute_jobs(skill, rjobs)
    for thread in threads:
thread.join()
    # printing track
warehouse.print_track()
main()

Instead of using threads, one can create a multi-process system also but multi-process system will be much heavy when so many employees entered into the system. Queue data structure is great and helpful library I could find while doing research for this problem statement as it manages all the synchronisation along with thread locks internally and developers do not need to think in this area while writing solutions for problems.

That is it, Please feel free to comment and share your doubts if any.

One clap, two clap, three clap, forty?

By clapping more or less, you can signal to us which stories really stand out.