AWS Lambda script to delete Inactive Users
I was doing learning within AWS and playing around with AWS lambda functions. This particular lambda functions utilizes boto3 library to delete all inactive user account in a aws account.
We may also convert the lambda function in the form of stackset to deploy it across all aws accounts.
import json
import boto3
from datetime import datetime
from datetime import timedelta
from botocore.exceptions import ClientError
import requests
import os
date_now = datetime.now()
iam_client = boto3.client('iam')
iam = boto3.resource('iam')
max_idle_days = 30
max_items = 20
##this is after error debug of not able to send msg to slack
#slack
url = os.environ['WEBHOOK_URL']
slack_channel = os.environ['SLACK_CHANNEL_NAME']
slack_emoji = ":aws-iam:"
slack_bot_username = "IAM Bot" # Slackbot Username
slack_message_title = ""
headers = {
'Content-Type': "application/json",
'User-Agent': "PostmanRuntime/7.19.0",
'Accept': "*/*",
'Cache-Control': "no-cache",
'Host': "hooks.slack.com",
'Accept-Encoding': "gzip, deflate",
'Content-Length': "497",
'Connection': "keep-alive",
'cache-control': "no-cache"
}
def get_sensored_access_key(access_key):
first_four = access_key[:4]
last_four = access_key[-4:]
return first_four + "*********" + last_four
def get_account_alias():
aliases = iam_client.list_account_aliases()['AccountAliases']
alias = ""
if len(aliases) == 0:
alias = id = boto3.client('sts').get_caller_identity().get('Account')
else:
alias = aliases[0]
return alias
def get_slack_payload(user, is_access_key, access_key="", arn="", diff=-1):
account_alias = get_account_alias()
payload = ""
if diff == -1:
diff = "Never"
else:
diff = str(diff) + " days"
payload = """{
\n\t\"channel\": \"#""" + slack_channel + """\",
\n\t\"username\": \"""" + slack_bot_username + """\",
\n\t\"icon_emoji\": \"""" + slack_emoji + """\",
\n\t\"attachments\":[\n
{\n
\"fallback\":\"Access Key Deactivated\",\n
\"pretext\":\"Access Key Deactivated\",\n
\"color\":\"#34bb13\",\n
\"fields\":[\n
{\n
\"value\":\"*Account:* """ + account_alias + """\n*User:* """ + user + """\n*ARN:* """ + arn + """\n*Access Key:* """ + access_key + """\n*Last Accessed:* """ + diff + """ \"\n
}\n
]\n
}\n
]\n
}"""
return payload
####main code #####
def lambda_handler(event, context):
try:
print("Running Lambda")
res_users = iam_client.list_users(
MaxItems=max_items
)
for user in res_users['Users']:
#print (user)
check_access_keys(user)
##to send general slack alert
webhook = 'https://hooks.slack.com/services/abc'
slack_data = {'text': "Lambda Ran"}
r = requests.post(
webhook, data=json.dumps(slack_data),
headers={'Content-Type': 'application/json'}
)
if r.status_code != 200:
raise ValueError(
'Request to slack returned an error %s, the response is:\n%s'
% (r.status_code, r.text))
except ClientError as error:
print('An error occurred while fetching user list.', error)
if res_users['IsTruncated']:
while res_users['IsTruncated']:
marker = res_users['Marker']
try:
res_users = iam_client.list_users(Marker=marker,MaxItems=max_items)
for user in res_users['Users']:
#check_login_profile(user)
check_access_keys(user)
except ClientError as error:
print('An error occurred while fetching user list.', error)
def check_access_keys(userData):
created_date = datetime.now()
last_used_date = datetime.now()
access_key_id = None
username = userData['UserName']
user_arn = userData['Arn']
# Below we are checking for access keys last usage
try:
res_keys = iam_client.list_access_keys(UserName=username,MaxItems=2)
#print(res_keys)
if 'AccessKeyMetadata' in res_keys:
for key in res_keys['AccessKeyMetadata']:
if 'CreateDate' in key:
created_date = res_keys['AccessKeyMetadata'][0]['CreateDate'].replace(tzinfo=None)
if 'AccessKeyId' in key:
access_key_id = key['AccessKeyId']
#print(access_key_id)
res_last_used_key = iam_client.get_access_key_last_used(AccessKeyId=access_key_id)
#print(res_last_used_key)
if 'LastUsedDate' in res_last_used_key['AccessKeyLastUsed']:
last_used_date = res_last_used_key['AccessKeyLastUsed']['LastUsedDate'].replace(tzinfo=None)
else:
last_used_date = created_date
difference = date_now - last_used_date
access_key_status = key['Status'] # Get status of the access keys
###working
if difference.days > max_idle_days and access_key_status == "Inactive":
access_key=iam.AccessKey(username,access_key_id)
#for logging help in cloudwatch
print(username)
print(access_key_id+ " deleted")
#Deactivate Access key
print("last access key block")
response = requests.request("POST", url, data=get_slack_payload(username, True, get_sensored_access_key(access_key_id), user_arn, diff=difference.days), headers=headers)
if response.status_code != 200:
raise ValueError(
'Request to slack returned an error %s, the response is:\n%s'
% (response.status_code, response.text))
response = access_key.delete()
except ClientError as error:
print('An error occurred while listing access keys', error)
So here the logic has been like if an access token has been inactive for more than 30 days, we will perform the cleanup and directly delete it. Do note that here we have assumed that, we are deleting the users whose access token is inactive. We may also change the logic to delete the access key which might be active but which are not used for more than 30 days. We just need to do some basic changes in the logic which anyone reading may easily figure out.
Feel free to let me know in comments if any help is needed to modify the lambda function.