Skip to content
  • joeyancheta @joeyancheta ·

    Riccardo, this is exactly what I was looking for! I don't have python skills (maybe in the near future), but how would I modify this for filtering by tags for users? My users aren't created with valid email addresses so changing the client's process is not likely.

  • Hi @joeyancheta, I'm glad you find this useful :-)

    You probably want to change the line filtered_users = list(filter(lambda u: u.get('PasswordLastUsed'), users))

    If you have a complex policy to check, let's create a dedicated function we invoke (I haven't tested this):

    def is_user_interesting(user):
        tags = user.get('Tags')
    
        for tag in tags:
            if tag['Key'] == 'Foo' and tag['Value'] == "Buzz":
                return True # We found the tag we are interested in!
    
        return False # If we are here, it means no tag matched, or there isn't any tag associated to the user

    Then, we provide this new function to the filter invocation:

    filtered_users = list(filter(is_user_interesting, users))
  • joeyancheta @joeyancheta ·

    Thanks for the reply @rpadovani

    Unfortunately, I am still getting an error when using your new function.

    I am getting closer though. I worked with a friend and we were able to build this new function. We are getting the tags, but aren't able to get the send notifications to go out.

        for keys_for_user in interesting_keys_grouped_by_user.values():
          user_name = keys_for_user[0]['UserName']
          print(user_name)
          user_details = iam_client.get_user(UserName=user_name)
          email = None
          print(user_details)
          for tag in user_details.get('Tags', []):
            if tag['Key'] == 'Email':
              email = tag['Value']
          if email is None:
                # error, the user has no email tag
            continue
          send_notification(email, keys_for_user, context.invoked_function_arn.split(":")[4])
    Edited by joeyancheta
  • joeyancheta @joeyancheta ·

    @rpadovani I just wanted to give you an update on my project. we actually need to use user_details = iam_client.list_user_tags(UserName=user_name) to get the tags.

    this is the final snippet that was added to your script.

        for keys_for_user in interesting_keys_grouped_by_user.values():
          user_name = keys_for_user[0]['UserName']
          print(user_name)
          user_details = iam_client.list_user_tags(UserName=user_name)
          email = None
          print(user_details)
          for tag in user_details.get('Tags', []):
            if tag['Key'] == 'CheckAccessKeyAge':
              email = tag['Value']
          if email is None:
                # error, the user has no email tag
            continue
          send_notification(email, keys_for_user, context.invoked_function_arn.split(":")[4])
    Edited by joeyancheta
  • Thanks for the update @joeyancheta, I am happy you were able to make it working for your use case :-)

  • Hi @rpadovani,

    I got an error when trying to run the script in my Lambda environment. Could you please tell me if it makes sense?

    Function logs:
    START RequestId: 1b0e3a07-06e9-40e9-9612-6557829f41a4 Version: $LATEST
    [ERROR]	2021-01-19T20:57:54.492Z	1b0e3a07-06e9-40e9-9612-6557829f41a4	Missing final '@domain'
    [ERROR]	2021-01-19T20:57:54.578Z	1b0e3a07-06e9-40e9-9612-6557829f41a4	Missing final '@domain'
    [ERROR]	2021-01-19T20:57:54.593Z	1b0e3a07-06e9-40e9-9612-6557829f41a4	Missing final '@domain'

    Systems setup (based off your blog post): IAM SES

  • Hi @scott178, the error seems to indicate that usernames of your users aren't actual email - thus the Missing final '@domain'. You can save your users' email in a tag, or if the domain is always the same, you can change line 59 from Destination={'ToAddresses': [email]},, to Destination={'ToAddresses': [email + "@example.com"]},.

    Let me know if this helps!

  • Hi @rpadovani,

    This is a really nice code. but, It doesn't work on my system.

    ALERT_AFTER_N_DAYS = 3
    SEND_EVERY_N_DAYS = 3 

    My Access key age is 2 days. Lambda function permission is Adminfullaccess. Is there anything I miss? The function runs well code 200.

    Thanks you bro!!

  • Hi @dlfp125, I'm glad you like the code. However, I haven't understood what's the problem you're facing. Does the script send the alert before the due date?

  • Oh,@rpadovan,

    Thank you for your answer. The cloudwatch event alarm runs lambda function every five minutes. There is no error and it works normally, but the email and key are not updated.

    Edited by loanshark
  • Hi @dlfp125 I am not sure if I've understood what you are trying to achieve: this script doesn't update any email nor any key: it just sends an email notification when your access key is older than N days.

    It is useful to remember people to rotate keys, when they are too old (the keys, not the people).

  • ah..! thanks you @rpadovan,

    I wasn't getting it right.

  • Sameer Goyal @Sameergoyal03 ·

    @rpadovani Can we get an enhanced code to automate key rotation..example create,disable and delete old key for all the user as in one script

    Edited by Sameer Goyal
  • Hi @Sameergoyal03, I am not sure if I understood what kind of script you are looking for. A good starting point would be https://aws.amazon.com/blogs/security/how-to-rotate-access-keys-for-iam-users/

  • Shedrack @dozieaji1 ·

    Hi Riccardo,

    This is really cool, thanks for taking time to come up with this solution.

    Pls, I am getting this error when I run the function;

    { "errorMessage": "send_notification() missing 1 required positional argument: 'account_id'", "errorType": "TypeError", "stackTrace": [ " File "/var/runtime/bootstrap.py", line 127, in handle_event_request\n response = request_handler(event, lambda_context)\n" ] }

    Let me know what's I am doing wrong.

    thanks

  • Hi @dozieaji1, have you changed the code? It seems that here: send_notification(user[0]['UserName'], user, context.invoked_function_arn.split(":")[4]) you are not passing the account id...

  • Abhishek Madan @iamabhishek2706 ·

    Hi @rpadovani,

    I have read the full code I want to know if I want to fetch the email id from tags what would be the name of tag we need to add in IAM user and what changes we need in the code and where.

    TBH, I don't have much knowledge of Coding.

    I was trying to run this code but cannot do that. getting error "Missing final '@domaiN'"

    Can you please help this is a bit urgent!!!!

    Thanks!

    Edited by Abhishek Madan
  • Hello @iamabhishek2706, the tag name is up to you, I don't know how you manage your IAM users.

    Tags are then inside the Tags key in the user object, so instead of user[0]['UserName'] you can do something like user[0]['Tags'].get('YourTagKey')

  • Abhishek Madan @iamabhishek2706 ·

    Hi @rpadovani,

    Thanks for your update.

    I totally understand what you said but I want to send notification to those emails which is in tags how to fetch the email from tags and send email to them. This is because in my environment the username is not the email I created a custom Tag "Email : test@test.com". How to send notifications to the email which is in the tags. I just wanted to set the destination email is fetched from tags.

    But I need some more help from you can you please provide the full instructions to setup this code I am not be able to setup the above code can you please instruct me with the full steps its will be a great and big help for me.

    Steps what I did: Created the IAM role. Attached that role to lambda function.

    During above steps while setting up SES I am not be able to set that can you please guide me to setup SES for the above lambda function.

    Its will be a great help for me, Thanks please respond ASAP. It's little bit urgent!

    Edited by Abhishek Madan
  • Hi @iamabhishek2706,

    If you already have the tag in your IAM users, then it should as easy as replacing send_notification(user[0]['UserName'], user, context.invoked_function_arn.split(":")[4]) with send_notification(user[0]['Tags'].get('Email'), user, context.invoked_function_arn.split(":")[4]).

  • Abhishek Madan @iamabhishek2706 ·

    Hi @rpadovani,

    Thanks for the update,

    Test.py

    I have attached the code which I have customized according to your instructions when I am running this there is no error but emails are not sending can you please have a look and review it.

    Your help is really appreciable, Thanks.

  • Hello @iamabhishek2706, I've taken a quick look at the script, and there isn't anything that makes me think it should fail.

    There are different possible issues:

    • There isn't any key that is older than one day
    • Your email is not authorized to send SES emails
    • The tag with the emails isn't actually called Email
    • Some IAM permissions, perhaps?

    You need somebody with proper access to the account to debug it.

  • oludare odo @dreyo143143 ·

    Hi Riccardo I came across your code and its what I have been looking for. I have a quick question please if you don't mind: I understand your code notify users to rotate their old access key but can you help with my scenario which is to notify all users then automatically delete the access key after 2 weeks if it still find old access key? thanks for what you do for the community

    Edited by oludare odo
  • Hi @dreyo143143, thanks for your kind words!

    First, remember that IAM keys must be deactivated before being deleted.

    For your use case, the code needs to be changed quite a while to maintain it clean. However, the summary would be:

    • we retrieve the keys we are interested in, as we have already done
    • we create a second filter, to check if the key is older than ALERT_AFTER_N_DAYS + 2 weeks
    • if the key pass this filter as well, we invoke the deactivate() method on the key (and the delete() as well, if you'd like)
  • oludare odo @dreyo143143 ·

    Hi @rpadovani,thanks for your quick response. I am sorry for asking these questions, i am starting to learn python. I have 2 more questions

    1.. if I use your code as it is, will my users be able to receive notification in their email(user's in my account are valid email address). also will i have to change anything in this last code send_notification(user[0]['UserName'], user, context.invoked_function_arn.split(":")[4])?

    2 Do you think its a good practice to have one Lambda function that send alert to users to rotate their keys if too old and have another lambda function that runs 2 weeks after to deactivate and delete old secret keys?

  • Hey @dreyo143143,

    1.. if I use your code as it is, will my users be able to receive notification in their email(user's in my account are valid email address). also will i have to change anything in this last code send_notification(user[0]['UserName'], user, context.invoked_function_arn.split(":")[4])?

    if the usernames are indeed valid email address, this piece of code should just work :-)

    2 Do you think its a good practice to have one Lambda function that send alert to users to rotate their keys if too old and have another lambda function that runs 2 weeks after to deactivate and delete old secret keys?

    I don't see any advantage in the case of this simple workflow: the IAM permissions needed are quite similar, and so are the API calls.

    However, if the functions will be part of a bigger workflow (e.g., some step functions), then of course it could make sense to split them!

  • oludare odo @dreyo143143 ·

    @riccardo I'm reporting back to you that when I ran your code as it is, it sent notifications to my users inbox. thank you so much.Just one more question

    Please if you could help,how do I have them receive an alert first,lets say 2weeks before then delete the access key in the same lambda function.What additional code do you think I need to add that would do that?

  • oludare odo @dreyo143143 ·

    Hi its me again,you can ignore my previous 2 questions,This is your complete code that I ran in my environment,it did not generate any error but did not send email to my user through the tag and also was unable to see that I have access key 135 days old in my environment.can you please take a look. the below are not the iussue:

    • There isn't any key that is older than one day ----- There is a key that is 135 days old
    • Your email is not authorized to send SES emails ----- email address is verified
    • The tag with the emails isn't actually called Email ---- Tag[key]= Email and Value is the email address
    • Some IAM permissions, perhaps?..Lambda has all the neccessary IAM permissions.Thank for your help

    from collections import defaultdict from datetime import datetime, timezone import logging

    import boto3 from botocore.exceptions import ClientError

    How many days before sending alerts about the key age?

    ALERT_AFTER_N_DAYS = 100

    How ofter we have set the cron to run the Lambda?

    SEND_EVERY_N_DAYS = 3

    Who send the email?

    SES_SENDER_EMAIL_ADDRESS = '1234@gmail.com'

    Where did we setup SES?

    SES_REGION_NAME = 'us-east-1'

    iam_client = boto3.client('iam') ses_client = boto3.client('ses', region_name=SES_REGION_NAME)

    Helper function to choose if a key owner should be notified today

    def is_key_interesting(key): # If the key is inactive, it is not interesting if key['Status'] != 'Active': return False

    elapsed_days = (datetime.now(timezone.utc) - key['CreateDate']).days
    
    # If the key is newer than ALERT_AFTER_N_DAYS, we don't need to notify the
    # owner
    if elapsed_days < ALERT_AFTER_N_DAYS:
        return False
    
    return True

    Helper to send the notification to the user. We need the receiver email,

    the keys we want to notify the user about, and on which account we are

    def send_notification(email, keys, account_id): email_text = f'''Dear {keys[0]['UserName']}, this is an automatic reminder to rotate your AWS Access Keys at least every {ALERT_AFTER_N_DAYS} days.

    At the moment, you have {len(keys)} key(s) on the account {account_id} that have been created more than {ALERT_AFTER_N_DAYS} days ago: ''' for key in keys: email_text += f"- {key['AccessKeyId']} was created on {key['CreateDate']} ({(datetime.now(timezone.utc) - key['CreateDate']).days} days ago)\n"

    email_text += f"""

    To learn how to rotate your AWS Access Key, please read the official guide at https://docs.aws.amazon.com/IAM/latest/UserGuide/id_credentials_access-keys.html#Using_RotateAccessKey If you have any question, please don't hesitate to contact the Support Team at support@example.com.

    This automatic reminder will be sent again in {SEND_EVERY_N_DAYS} days, if the key(s) will not be rotated.

    Regards, Your lovely Support Team """

    try:
        ses_response = ses_client.send_email(
            Destination={'ToAddresses': [email]},
            Message={
                'Body': {'Html': {'Charset': 'UTF-8', 'Data': email_text}},
                'Subject': {'Charset': 'UTF-8',
                            'Data': f'Remember to rotate your AWS Keys on account {account_id}!'}
            },
            Source=SES_SENDER_EMAIL_ADDRESS
        )
    except ClientError as e:
        logging.error(e.response['Error']['Message'])
    else:
        logging.info(f'Notification email sent successfully to {email}! Message ID: {ses_response["MessageId"]}')

    def lambda_handler(event, context): users = [] is_truncated = True marker = None

    # We retrieve all users associated to the AWS Account.  
    # Results are paginated, so we go on until we have them all
    while is_truncated:
        # This strange syntax is here because `list_users` doesn't accept an 
        # invalid Marker argument, so we specify it only if it is not None
        response = iam_client.list_users(**{k: v for k, v in (dict(Marker=marker)).items() if v is not None})
        users.extend(response['Users'])
        is_truncated = response['IsTruncated']
        marker = response.get('Marker', None)
    
    # Probably in this list you have bots, or users you want to filter out
    # You can filter them by associated tags, or as I do here, just filter out 
    # all the accounts that haven't logged in the web console at least once
    # (probably they aren't users)
    filtered_users = list(filter(lambda u: u.get('PasswordLastUsed'), users))
    
    interesting_keys = []
    
    # For every user, we want to retrieve the related access keys
    for user in filtered_users:
        response = iam_client.list_access_keys(UserName=user['UserName'])
        access_keys = response['AccessKeyMetadata']
        
        # We are interested only in Active keys, older than
        # ALERT_AFTER_N_DAYS days
        interesting_keys.extend(list(filter(lambda k: is_key_interesting(k), access_keys)))
    
    # We group the keys by owner, so we send no more than one notification for every user
    interesting_keys_grouped_by_user = defaultdict(list)
    for key in interesting_keys:
        interesting_keys_grouped_by_user[key['UserName']].append(key)
    
    for user in interesting_keys_grouped_by_user.values():
        # In our AWS account the username is always a valid email. 
        # However, you can recover the email from IAM tags, if you have them
        # or from other lookups
        # We also get the account id from the Lambda context, but you can 
        # also specify any id you want here, it's only used in the email 
        # sent to the users to let them know on which account they should
        # check
        send_notification(user[0]['UserName'], user, context.invoked_function_arn.split(":")[4])
        
    for keys_for_user in interesting_keys_grouped_by_user.values():
        user_name = keys_for_user[0]['UserName']
        print(username)
        user_details = iam_client.list_user_tags(UserName=user_name)
        email = None
        print(user_details)
        for tag in user_details.get('Tags', []):
            if tag['Key'] == 'CheckAccessKeyAge':
                email = tag['Value']
        if email is None:
                # error,the user has no email tag
            contiue
        send_notification(email, keys_for_user, context.invoked_function_arn.split(":")[4])
    Edited by oludare odo
  • Hi @dreyo143143, quite a busy week, thanks for the patience.

    About how to delete the key: you should iterate over interesting_keys, check the date once again, and then invoke the function to delete such key.

    About the code not working, it is hard to debug without having access to the environment, and without logs.

    If I'd be you, I would print the major information in the logs, and try to understand where things break.

    If you need additional help, I am available for professional consulting, just email me.

  • Hi Riccardo! Great tool and working fine. I added SNS topic as destination, but it shows up failure or success result. I'd like to get usernames who was notified about old age keys. What should I add to existing Lambda code or how to do that? Thank you

  • Hi @rpadovani, great work u r doing. Wanna ask about your insights on your lambda function.

    I have the same problem as @joeyancheta where my usernames aren't email address and needed to use tags value to define their email addresses so I basically copied from both of your codes and combined them. I am able to receive email when I did initial testing (setting ALERT_AFTER_N_DAYS = 0) and the eventbridge i set to 1 min so that I can get the alerts immediately, but the email alert was only received by 1 account. I have tagged other accounts with the same email address but they didn't receive anything (I confirmed they have keys more than 130 days).

    Any help on the matter is GREATLY APPRECIATED!

    This is the code I'm using:

    Click to expand from collections import defaultdict from datetime import datetime, timezone import logging

    import boto3 from botocore.exceptions import ClientError

    How many days before sending alerts about the key age?

    ALERT_AFTER_N_DAYS = 15

    How ofter we have set the cron to run the Lambda?

    SEND_EVERY_N_DAYS = 3

    Who send the email?

    SES_SENDER_EMAIL_ADDRESS = 'aws-reminders@sal.org.sg'

    Where did we setup SES?

    SES_REGION_NAME = 'ap-southeast-1'

    iam_client = boto3.client('iam') ses_client = boto3.client('ses', region_name=SES_REGION_NAME)

    Helper function to choose if a key owner should be notified today

    def is_key_interesting(key): # If the key is inactive, it is not interesting if key['Status'] != 'Active': return False

    elapsed_days = (datetime.now(timezone.utc) - key['CreateDate']).days
    
    # If the key is newer than ALERT_AFTER_N_DAYS, we don't need to notify the
    # owner
    if elapsed_days < ALERT_AFTER_N_DAYS:
        return False
    
    return True

    Helper to send the notification to the user. We need the receiver email,

    the keys we want to notify the user about, and on which account we are

    def send_notification(email, keys, account_id): email_text = f'''Dear {keys[0]['UserName']}, this is an automatic reminder to rotate your AWS Access Keys at least every {ALERT_AFTER_N_DAYS} days.

    At the moment, you have {len(keys)} key(s) on the account {account_id} (LAWNET 3) that have been created more than {ALERT_AFTER_N_DAYS} days ago: ''' for key in keys: email_text += f"- {key['AccessKeyId']} was created on {key['CreateDate']} ({(datetime.now(timezone.utc) - key['CreateDate']).days} days ago)\n"

    email_text += f"""

    To learn how to rotate your AWS Access Key, please read the official guide at https://docs.aws.amazon.com/IAM/latest/UserGuide/id_credentials_access-keys.html#Using_RotateAccessKey

    This automatic reminder will be sent again in {SEND_EVERY_N_DAYS} days, if the key(s) are not be rotated. Thank you. """

    try:
        ses_response = ses_client.send_email(
            Destination={'ToAddresses': [email]},
            Message={
                'Body': {'Html': {'Charset': 'UTF-8', 'Data': email_text}},
                'Subject': {'Charset': 'UTF-8',
                            'Data': f'Remember to rotate your AWS Keys on account {account_id} LAWNET 3!'}
            },
            Source=SES_SENDER_EMAIL_ADDRESS
        )
    except ClientError as e:
        logging.error(e.response['Error']['Message'])
    else:
        logging.info(f'Notification email sent successfully to {email}! Message ID: {ses_response["MessageId"]}')

    def lambda_handler(event, context): users = [] is_truncated = True marker = None

    # We retrieve all users associated to the AWS Account.  
    # Results are paginated, so we go on until we have them all
    while is_truncated:
        # This strange syntax is here because `list_users` doesn't accept an 
        # invalid Marker argument, so we specify it only if it is not None
        response = iam_client.list_users(**{k: v for k, v in (dict(Marker=marker)).items() if v is not None})
        users.extend(response['Users'])
        is_truncated = response['IsTruncated']
        marker = response.get('Marker', None)
    
    # Probably in this list you have bots, or users you want to filter out
    # You can filter them by associated tags, or as I do here, just filter out 
    # all the accounts that haven't logged in the web console at least once
    # (probably they aren't users)
    filtered_users = list(filter(lambda u: u.get('PasswordLastUsed'), users))
    
    interesting_keys = []
    
    # For every user, we want to retrieve the related access keys
    for user in filtered_users:
        response = iam_client.list_access_keys(UserName=user['UserName'])
        access_keys = response['AccessKeyMetadata']
        
        # We are interested only in Active keys, older than
        # ALERT_AFTER_N_DAYS days
        interesting_keys.extend(list(filter(lambda k: is_key_interesting(k), access_keys)))
    
    # We group the keys by owner, so we send no more than one notification for every user
    interesting_keys_grouped_by_user = defaultdict(list)
    for key in interesting_keys:
        interesting_keys_grouped_by_user[key['UserName']].append(key)
    
    
    for keys_for_user in interesting_keys_grouped_by_user.values():
        user_name = keys_for_user[0]['UserName']
        print(user_name)
        user_details = iam_client.list_user_tags(UserName=user_name)
        email = None
        print(user_details)
        for tag in user_details.get('Tags', []):
          if tag['Key'] == 'CheckAccessKeyAge':
            email = tag['Value']
        if email is None:
              # error, the user has no email tag
          continue
        send_notification(email, keys_for_user, context.invoked_function_arn.split(":")[4])
    Edited by Ken Lee
  • Hi Riccardo! Great tool and working fine. I added SNS topic as destination, but it shows up failure or success result. I'd like to get usernames who was notified about old age keys. What should I add to existing Lambda code or how to do that? Thank you

    Hi @dport2531, I am uncertain if I understand what you mean: would you like to have a summary on SNS? I suppose it then depends on what you want to write on SNS, can you share a snippet of the code you wrote?

    I have tagged other accounts with the same email address but they didn't receive anything (I confirmed they have keys more than 130 days).

    Hi @kenlcc, did the users log in at least once? What's the key of the tag you are using?

  • Hi @rpadovani, now that you mentioned it, that IAM account doesn't have console access, it's an IAM account created specifically for S3 access key and I'm the person who's managing it (so the alert will be sent to my email address). I have other users (with console access) that uses different email addresses all are able to receive the alerts. The tag's key is CheckAccessKeyAge, value is my email address. Note that I have another IAM account that is used for console access with the same key:value as this, and that console account is able to receive the alert.

    Thanks.

  • Hi @kenlcc, I am happy it now works, and it is useful :-)

    As you probably have noticed, the behavior is managed by this line:

        filtered_users = list(filter(lambda u: u.get('PasswordLastUsed'), users))
  • @rpadovani, thanks for the fast reply. So is it possible to just remove this line so that all the accounts will get notified whether it has been logged in before or not? Thanks.

  • Hi @kenlcc,

    if you remove the line, then you need also to change

    for user in filtered_users:

    to

    for user in users:

    I hope this helps! R.

  • Hi @rpadovani,

    Thanks for the advise. I'll test it out.

  • Hi @rpadovani,

    Update: I have tested it and it is working! thank you so much for your help.

0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment