Production ready and cloud agnostic SSH Alarms on Slack

Raphael.dll
3 min readNov 29, 2022

--

With a simple search on google is possible to see that its very hard to find good quality material on SSH Alarms , simple because nowadays people tend to depend more and more on the Cloud to do simple tasks .

A good example its AWS CloudWatch with its logs groups that are great but they depend on SSM Agent.

Lets imagine one host that no one its supposed to access it and its high value target , if your company has a SOC Team the access to this machine can monitored and mitigated but if not then it can become a nightmare

So this tutorial aims to teach how it is possible to not just receive alert from SSH in cloud machines without agents but also how to receive IOC “Indicators of compromise” on the alerts .

Hand On Lab

10 Minutes

  1. Lets start with a simple line of code
 sudo echo "session optional pam_exec.so seteuid /etc/ssh/scripts/sshnotify.sh" >> /etc/pam.d/sshd

This command says every time SSH Service its used it should run an script called sshnotify.sh

2. Now lets create the sshnotify script

sudo vim /etc/ssh/scripts/sshnotify.sh

3. Once VIM opens paste the following code

#!/bin/bash

if [ "$PAM_TYPE" != "close_session" ]; then
host="$(hostname)"
python3 /etc/ssh/scripts/slack.py -ip $PAM_RHOST -host $host -user $PAM_USER
fi
exit

4. Make the script executable

sudo chmod +x /etc/ssh/scripts/sshnotify.sh

5 . Now as last step lets create the python script that will handle the Slack message as well the IOC Research .

Please put attention that abuseipdb has a free API , you need to do an account to get yout key.
Slack Webhook its being used .

sudo vim /etc/ssh/scripts/slack.py
import json
import requests
import os
import argparse

parser = argparse.ArgumentParser()

parser.add_argument('-ip', action='store', dest='simple_ip',
help='Store ip')

parser.add_argument('-host', action='store', dest='simple_host',
help='Store host')

parser.add_argument('-user', action='store', dest='simple_user',
help='Store user')



results = parser.parse_args()

print(results.simple_ip)

print(results.simple_host)


querystring = {
'ipAddress': results.simple_ip,
'maxAgeInDays': '1'
}

# Defining the api-endpoint
url = 'https://api.abuseipdb.com/api/v2/check'



headers = {
'Accept': 'application/json',
'Key': 'IPDB API KEY XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX'
}





response = requests.request(method='GET', url=url, headers=headers, params=querystring)

# Formatted output
decodedResponse = json.loads(response.text)
decode = json.dumps(decodedResponse, sort_keys=True, indent=4)
print(decode)
# Set the webhook_url to the one provided by Slack when you create the webhook at https://my.slack.com/services/new/incoming-webhook/
webhook_url = 'https://hooks.slack.com/XXXXXXXXXXXXXX'
slack_data = {
"blocks": [
{
"type": "image",
"image_url": "https://www.abuseipdb.com/img/abuseipdb.png.pagespeed.ce.CI8T6WsXU7.png",
"alt_text": "inspiration"
},
{
"type": "section",
"text": {
"type": "mrkdwn",
"text": "*Security report : *"
}
},
{
"type": "divider"
},
{
"type": "section",
"text": {
"type": "plain_text",
"text": "IP: " + str(decodedResponse["data"]["ipAddress"])

}
},
{
"type": "section",
"text": {
"type": "plain_text",
"text": "Domain: " + str(decodedResponse["data"]["domain"])

}
},
{
"type": "section",
"text": {
"type": "plain_text",
"text": "Last date reported: " + str(decodedResponse["data"]["lastReportedAt"])

}
},
{
"type": "section",
"text": {
"type": "plain_text",
"text": "Abuse confidence: " + str(decodedResponse["data"]["abuseConfidenceScore"])

}
},
{
"type": "section",
"text": {
"type": "plain_text",
"text": "Country: " + str(decodedResponse["data"]["countryCode"])

}
},
{
"type": "section",
"text": {
"type": "plain_text",
"text": "Usage type: " + str(decodedResponse["data"]["usageType"])

}
},
{
"type": "section",
"text": {
"type": "plain_text",
"text": "Total Reports: " + str(decodedResponse["data"]["totalReports"])

}
},
{
"type": "section",
"text": {
"type": "plain_text",
"text": "ISP: " + str(decodedResponse["data"]["isp"])

}
},
{
"type": "divider"
},
{
"type": "section",
"text": {
"type": "mrkdwn",
"text": "*Sumary*"
}
},
{
"type": "section",
"text": {
"type": "mrkdwn",
"text": "*SSH login:* " + " User " +str(results.simple_user) +" to host the "+ str(results.simple_host)
}
},
{
"type": "section",
"text": {
"type": "mrkdwn",
"text": "*SSH Accessfrom IP* " + str(results.simple_ip)
}
}
]
}





response = requests.post(
webhook_url, data=json.dumps(slack_data),
headers={'Content-Type': 'application/json'}
)
if response.status_code != 200:
raise ValueError(
'Request to slack returned an error %s, the response is:\n%s'
% (response.status_code, response.text)
)

Last do not forget to restart SSH Service with

sudo systemctl restart sshd

Once an successful login to the SSH Machine

As my IP is clean no reports come from it , but in a real situation it can help you understand if incident response its needed .

--

--