cft

AWS -EMR Monitoring and Alerting Bot

A bot to alert via slack channel monitoring the EMR cluster usage.


user

Mageswaran D

3 years ago | 22 min read

With AWS EMR a brand new cluster of any size is a few clicks away, with that lets focus on how to monitor the cluster and cut down the cost as much as possible.

Typically after few days people may forget to shutdown the clusters, leaving the budget bleeding heavily on the AWS billing.


While searching for EMR monitoring and controlling, we found following links:

Both authors have given excellent ideas and steps to monitor the resources. However we were looking for more simpler solution with less moving parts.

We wanted to go with AWS BOTO3 + AIRFLOW with Slack and Gmail alerts.

Of course there is no way of escaping Cloudwatch. We use cloudwatch to monitor the EMR metrics and Boto3 EMR APIs to take any action. Check this link for list of metrics avaiable https://docs.aws.amazon.com/emr/latest/ManagementGuide/UsingEMR_ViewingMetrics.html

1. Alert : Slack

We used Airflow Slack operator for slack alerts, for more details check out here.

from airflow.hooks.base_hook import BaseHook
from airflow.contrib.operators.slack_webhook_operator import SlackWebhookOperator
SLACK_CONN_ID = 'Slack'def emr_slack_alert(context, msgs):
slack_webhook_token = BaseHook.get_connection(SLACK_CONN_ID).password
slack_msg = """
:large_blue_circle: Task suceeded. {ti}
*Task*: {task}
*Dag*: {dag}
*Execution Time*: {exec_date}
*Log Url*: {log_url}
*Cluster Details* : {msgs}
*Please check above clusters for manual scale down.*
""".format(
task=context.get('task_instance').task_id,
dag=context.get('task_instance').dag_id,
ti=context.get('task_instance'),
exec_date=context.get('execution_date'),
log_url=context.get('task_instance').log_url,
msgs="".join(msgs))

success_alert = SlackWebhookOperator(
task_id='slack_test',
http_conn_id='Slack',
webhook_token=slack_webhook_token,
message=slack_msg,
username='airflow')

return success_alert.execute(context=context)

2. Alert : Gmail

We used Yagmail for pythonic way of triggering the mails. (Note : `From` address used in Yagmail needs special permission for 3rd party apps to login behalf and send emails!, follow the errors in your setup to get more help, as I lost reference for the setup at the moment)

import yagmail

yag = yagmail.SMTP(from_address, from_email_password)
yag.send(to=to_address, subject=subject, contents=contents, attachments=attachments)

Where each variable is self explanatory.

def notify_email(cluster_name, num_core_nodes, num_tasks_nodes, mail_id):
"""Send custom email alerts."""

if mail_id is None:
return

# email title.
title = "Airflow EMR alert: {}".format(cluster_name)

# email contents
body = """
Hi Everyone,

Cluster {} is running {} core node(s) and {} tasks node(s).
Please check above cluster for manual scale down.

Forever yours,
Airflow bot
""".format(cluster_name, num_core_nodes, num_tasks_nodes)

logging.info("Sending the mail....")

# send_email(mail_id, title, body)

YagMail.email_notifier_ini(subject=title,
contents=body,
to_address=mail_id,
config_dir="/home/airflow/config_files/")

Note “ YagMail.email_notifier_ini” is wrapper class to abstract the reading of the mail credentials using configparser.

3 . AWS Boto3 : EMR + Cloudwatch

Check this repository https://github.com/Scout24/emr-autoscaling for a complete EMR scaling with Lambda and Boto3 APIs, which I used a reference.

Below is class that abstracts most of EMR and Cloudwatch interaction with easy to use APIs for our needs.

"""
Copy this file to AIRFLOW_HOME/dags/
"""

import boto3
from datetime import datetime, timedelta
import logging

logging = logging.getLogger(__name__)
logging.setLevel("ERROR")

class EMRMonitor(object):
"""
Class to monitor EMR cluster and take control actions
Reference:
- https://docs.aws.amazon.com/emr/latest/ManagementGuide/UsingEMR_ViewingMetrics.html
- https://github.com/Scout24/emr-autoscaling
- https://www.terraform.io/docs/providers/aws/r/emr_cluster.html
"""
def __init__(self,
job_flow_id=None,
job_flow_name="default",
region_name='us-east-1'):
"""

:param job_flow_id: Clsuter Id
:param job_flow_name: Name of the clsuter
:param region_name: AWS region name
"""
self._emr = boto3.client('emr', region_name=region_name)
self._cloudwatch = boto3.client('cloudwatch', region_name=region_name)

self._job_flow_id = job_flow_id
self._job_flow_name = job_flow_name

print("EMRVHMonitor")

# TODO ues @property
def set_job_flow_id(self, job_flow_id):
self._job_flow_id = job_flow_id

def set_job_flow_name(self, job_flow_name):
self._job_flow_name = job_flow_name

def get_list_of_clusters(self):
"""
Check out the API `list_cluster` for details response information.
Pagination is used to get all the clusters.
:return: List of cluster details starting with Id.
"""
page_iterator = self._emr.get_paginator('list_clusters').paginate(
CreatedAfter=datetime(2019, 1, 1),
CreatedBefore=datetime(2021, 1, 1),
ClusterStates=[
'STARTING', 'BOOTSTRAPPING', 'RUNNING', 'WAITING', 'TERMINATING', 'TERMINATED', 'TERMINATED_WITH_ERRORS',
]
)

response = []

for page in page_iterator:
for clusters in page['Clusters']:
response.append(clusters)

return response

def get_active_clusters(self):
"""
Filters the active running clusters.
:return: List of response of active clusters
"""
response = self.get_list_of_clusters()
res = []
for cluster in response:
if cluster["Status"]["State"] in ['STARTING', 'BOOTSTRAPPING', 'RUNNING', 'WAITING']:
res.append(cluster)
return res

def get_active_clusters_id_n_names(self):
"""

:return: List of Ids and Names of the active clusters
"""
res = self.get_active_clusters()
ids = []
names = []
for cluster in res:
id = cluster["Id"]
name = cluster["Name"]
ids.append(id)
names.append(name)
return ids, names

def get_task_instance_groups(self):
"""

:return: Returns list of TASK instances group for current EMR cluster
"""
assert self._job_flow_id is not None
res = self._emr.list_instance_groups(ClusterId=self._job_flow_id)["InstanceGroups"]
res = filter(
lambda g: g["InstanceGroupType"] == "TASK",
res
)
return list(res)

def get_core_instance_groups(self):
"""

:return: Returns list of CORE instances group for current EMR cluster
"""
assert self._job_flow_id is not None
res = self._emr.list_instance_groups(ClusterId=self._job_flow_id)["InstanceGroups"]
res = filter(
lambda g: g["InstanceGroupType"] == "CORE",
res
)
return list(res)

def get_emr_cloudwatch_metric(self, name, statistics, unit):
"""
GEts the cloudwatch metrics for last one hours
:param name: Name of the metric
:param statistics: Minimum/Maximum/Count/Average
:param unit: Refer EMR `get_metric_statistics` API for more details
:return:
"""
"""
Gets the metrics for given name
:param name: As mentioned in https://docs.aws.amazon.com/emr/latest/ManagementGuide/UsingEMR_ViewingMetrics.html
:return:
"""

assert self._job_flow_id is not None

res = -1
now = datetime.utcnow().replace(second=0, microsecond=0)
stats = self._cloudwatch.get_metric_statistics(
Namespace="AWS/ElasticMapReduce",
MetricName=name,
StartTime=now - timedelta(minutes=60),
EndTime=now,
Period=300,
Statistics=[
statistics,
],
Unit=unit,
Dimensions=[
{
"Name": "JobFlowId",
"Value": self._job_flow_id
}
]
)

try:
res = stats["Datapoints"][0][statistics]
except:
print(stats)
return res

def set_nodes_policy(self, id, min=0, max=10):
"""
Sets the auto scaling policy for the instance group nodes
:param id: ID of the group instances
:param min:
:param max:
:return:
"""

assert self._job_flow_id is not None

self._emr.put_auto_scaling_policy(
ClusterId=self._job_flow_id,
InstanceGroupId=id,
AutoScalingPolicy={
'Constraints': {
'MinCapacity': min,
'MaxCapacity': max
},
'Rules' : [
# {
# "Name": "scale-out",
# "Description": "testing emr scale out with boto3",
# "Action": {
# # "Market": "SPOT",
# "SimpleScalingPolicyConfiguration": {
# "AdjustmentType": "CHANGE_IN_CAPACITY",
# "ScalingAdjustment": 0,
# "CoolDown": 300
# }
# },
# "Trigger": {
# "CloudWatchAlarmDefinition": {
# "ComparisonOperator": "GREATER_THAN",
# "EvaluationPeriods": 1,
# "MetricName": "YARNMemoryAvailablePercentage",
# "Namespace": "AWS/ElasticMapReduce",
# "Period": 300,
# "Statistic": "AVERAGE",
# "Threshold": 75.0,
# "Unit": "PERCENT"
# }
# }
# }
],
}
)

def check_and_scale_down(self):
"""
Checks for `YARNMemoryAvailablePercentage`, `CoreNodesRunning`, `TaskNodesRunning` metrics.
If the memory is greater than 90% and node counts are greater than 1, returns True or otherise.

Note: No control action is taken for now
:return: True/False
"""
# print("Checking cluster {} with id {}".format(self._job_flow_name, self._job_flow_id))

is_scale_down = False
num_core_nodes = -1
num_tasks_nodes = -1
assert self._job_flow_id is not None

yarn_available_memory_percent = self.get_emr_cloudwatch_metric(name="YARNMemoryAvailablePercentage",
statistics="Average",
unit="Percent")
apps_running_count = self.get_emr_cloudwatch_metric(name="AppsRunning",
statistics="Maximum",
unit="Count")

logging.debug("YARNMemoryAvailablePercentage : {}".format(yarn_available_memory_percent))
logging.debug("AppsRunning : {}".format(apps_running_count))

print("YARNMemoryAvailablePercentage : {}".format(yarn_available_memory_percent))
print("AppsRunning : {}".format(apps_running_count))

if yarn_available_memory_percent > 90 : #and apps_running_count == 0:
num_core_nodes = self.get_emr_cloudwatch_metric("CoreNodesRunning",
statistics="Average",
unit="Count")
num_tasks_nodes = self.get_emr_cloudwatch_metric("TaskNodesRunning",
statistics="Average",
unit="Count")

if num_core_nodes > 1:
is_scale_down = True
try:
id = self.get_core_instance_groups()[0]["Id"]
# print("Following core cluster group needs action: ", end=" ")
# print(id)
# emr.set_nodes_policy(id=id, min=1, max=1)
except Exception as e:
raise RuntimeError("Not able to scale down EMR cluster")

if num_tasks_nodes > 1:
is_scale_down = True
try:
id = self.get_task_instance_groups()[0]["Id"]
# print("Following task cluster group needs action: ", end=" ")
# print(id)
# emr.set_nodes_policy(id=id, min=0, max=1)
except Exception as e:
raise RuntimeError("Not able to scale down EMR cluster")

if is_scale_down:
print("Cluster {} with id {} needs attentions! which is running {} core nodes {} task nodes".format(
self._job_flow_name, self._job_flow_id,
num_core_nodes, num_tasks_nodes))
return is_scale_down, num_core_nodes, num_tasks_nodes

def describe(self):
"""
Gets the description of the current cluster
:return:
"""
assert self._job_flow_id is not None
response = self._emr.describe_cluster(ClusterId=self._job_flow_id)
return response["Cluster"]

def get_email_id(self):
"""
Looks and extracts the value of the key `Email` if present in the EMR Summary page Tags.
:return:
"""
email_id = None
tags = self.describe()["Tags"]
for tag in tags:
if tag['Key'] == "Email":
email_id = tag["Value"]
return email_id


if __name__ == '__main__':
emr = EMRMonitor()
cluster_ids, cluster_names = emr.get_active_clusters_id_n_names()

print(cluster_ids)
print(cluster_names)
for cluster_id, cluster_name in zip(cluster_ids, cluster_names):
print(">>>>>>>>>>>>>>>>>>>>>")
print(cluster_name)
emr.set_job_flow_id(job_flow_id=cluster_id)
emr.set_job_flow_name(job_flow_name=cluster_name)
emr.check_and_scale_down()
print("<<<<<<<<<<<<<<<<<<<<<")

Here we are listing down all the active clusters and filtering the idle cluster based on Yarn memory, number of core and task nodes. (this is just an example, we can have more sophisticated business requirements to monitor and apply EMR policy, for naive approach this will be a good start)

Note: This code was directly tested on EMR whoch has all the permission to use Boto3, if you wanted to test on non EMR/EC2 machine, you need to set up the EMR/Cloudwatch client with proper AWS keys.

4: Airflow : Custom Operator

Now we have means to trigger alerts, monitor and control EMR. How to schedule our bot into action? With Custom Airflow Operator. Why not Python Operator, well its matter of choice and the control we wanted on our operator.

from airflow.models import BaseOperatorclass EMRMonitorOperator(BaseOperator):

@apply_defaults
def __init__(self, job_flow_id, job_flow_name, *args, **kwargs):
super(EMRMonitorOperator, self).__init__(*args, **kwargs)
self._emr = EMRMonitor(job_flow_id=job_flow_id, job_flow_name=job_flow_name)
self._job_flow_id = job_flow_id
self._job_flow_name = job_flow_name

def execute(self, context):

is_scale_down = False
#TODO use Airflow Variables to monitor only a subset of clusters
cluster_ids, cluster_names = self._emr.get_active_clusters_id_n_names()

print(cluster_ids)
print(cluster_names)

logging.info(cluster_ids)
logging.info(cluster_names)

msgs = []

for cluster_id, cluster_name in zip(cluster_ids, cluster_names):
self._emr.set_job_flow_id(job_flow_id=cluster_id)
self._emr.set_job_flow_name(job_flow_name=cluster_name)
res, num_core_nodes, num_tasks_nodes = self._emr.check_and_scale_down()
is_scale_down = is_scale_down or res
if res: # Add current cluster details if it is flagged as scale down
msg = "\nCluster {} is running {} Core node(s) and {} Tasks node(s).".format(
cluster_name, num_core_nodes, num_tasks_nodes)
msgs.append(msg)
mail_id = self._emr.get_email_id()
notify_email(cluster_name=cluster_name,
num_core_nodes=num_core_nodes,
num_tasks_nodes=num_tasks_nodes,
mail_id=mail_id)

if is_scale_down:
emr_slack_alert(context=context, msgs=msgs)

5 :Airflow Dag

And finally lets make our Airflow Dag…

SCHEDULE_INTERVAL = "0 * * * *"
cluster_ids = None
fernet = get_fernet()

default_args = {
"depends_on_past": False,
"start_date": airflow.utils.dates.days_ago(1),
"retries": 1,
"retry_delay": datetime.timedelta(hours=1),
}dag = DAG("EMRMonitoring",
default_args=default_args,
catchup=False,
schedule_interval=SCHEDULE_INTERVAL)

start = DummyOperator(
task_id='start',
dag=dag
)

end = DummyOperator(
task_id='end',
dag=dag)


emr_monitor_dag = EMRMonitorOperator(job_flow_id=None,
job_flow_name=None,
task_id='emr_monitor',
dag=dag)
start.set_downstream(emr_monitor_dag)
emr_monitor_dag.set_downstream(end)

Dadaa huh our bot is ready…

Our Bot : Configparser + Yagmail + Airflow Slack Operator + Boto3 (EMR & Cloudwatch) + Airflow Custom Operator

All the code snippets shared here are bits and pieces just to give the readers an idea on how to build a in house python AWS EMR monitoring bot! The code shared needs good amount of cleanups and through testing!

Upvote


user
Created by

Mageswaran D


people
Post

Upvote

Downvote

Comment

Bookmark

Share


Related Articles