How To Rotate Secrets In AWS Secret Manager
Hello Everyone
Welcome to CloudAffaire and this is Debjeet.
In today’s blog post, we will discuss how to rotate secrets in AWS secret manager automatically. You can rotate (update) a secret automatically in AWS secret manager. When you rotate a secret, you update both the secret and the service or application that is using that secret. You can either manually or automatically rotate a secret in AWS secret manager. In order to automatically rotate a secret, you also need to setup a lambda function with proper IAM role.
There are two types of rotation strategy that you can adapt to rotate your secrets –
- Single user rotation strategy: Updates credentials for one user in one secret.
- Alternating user rotation strategy: Updates credentials for two users in one secret.
How To Rotate Secrets In AWS Secret Manager
Prerequisites:
- AWS CLI installed and configured with proper access. You can use below link to install and configure AWS CLI.
https://cloudaffaire.com/how-to-install-aws-cli/
https://cloudaffaire.com/how-to-configure-aws-cli/
Step 1: Create a RDS MySQL instance.
The connection string for this RDS instance will serve as the secret that we will store in AWS secret manager and rotate using a lambda function.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 |
## --------------------------- ## Create A MYSQL RDS Instance ## --------------------------- ## Generate connection string details for your RDS MySQL instance DB_ADMIN_USERNAME="admin" && DB_ENGINE="mysql" && DB_INSTANCE_NAME="mysqldbinstance" && DB_NAME="mydb" && DB_ADMIN_PASSWORD=$(aws secretsmanager get-random-password \ --password-length 20 \ --no-exclude-numbers \ --exclude-punctuation \ --no-exclude-uppercase \ --no-exclude-lowercase \ --no-include-space \ --require-each-included-type | jq -r .RandomPassword) ## Create a new RDS MySQL instance aws rds create-db-instance \ --db-instance-identifier $DB_INSTANCE_NAME \ --db-instance-class db.t3.micro \ --engine $DB_ENGINE \ --db-name $DB_NAME \ --master-username $DB_ADMIN_USERNAME \ --master-user-password $DB_ADMIN_PASSWORD \ --allocated-storage 20 ## Get RDS instance creation status aws rds describe-db-instances \ --db-instance-identifier $DB_INSTANCE_NAME \ --query 'DBInstances[?DBInstanceIdentifier == `mysqldbinstance`].DBInstanceStatus' ## Proceed once DBInstanceStatus: "available" ## Get RDS instance host name DB_HOST=$(aws rds describe-db-instances \ --db-instance-identifier $DB_INSTANCE_NAME \ --query 'DBInstances[?DBInstanceIdentifier == `mysqldbinstance`].Endpoint.Address' \ --output text) ## Test RDS MySQL connection mysql -u $DB_ADMIN_USERNAME -p$DB_ADMIN_PASSWORD -h $DB_HOST $DB_NAME quit; #to exit mysql connection ## Install mysql-client if you get an error executing mysql command |
Step 2: Create a new secret containing the RDS MySQL instance connection details.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 |
## --------------------------------------------------- ## Create A Secret For RDS Instance Connection Details ## --------------------------------------------------- ## Create a secret file for RDS instance connection details cat << EOF > mysql_secret.json { "engine": "$DB_ENGINE", "host": "$DB_HOST", "username": "$DB_ADMIN_USERNAME", "password": "$DB_ADMIN_PASSWORD", "dbname": "$DB_NAME" } EOF ## Create a new secret in AWS secret manager aws secretsmanager create-secret \ --name mysqldbsecrets \ --secret-string file://mysql_secret.json \ --description "secret for mysql rds instance" \ --tags 'Key=Name,Value=mysqldbsecret' |
Step 3: Create a custom lambda function with IAM role and lambda layer.
This Lambda function will update RDS connection secrets in AWS secret manager and also update the admin user password in MySQL RDS instance.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 |
## ------------------------------------------------------ ## Create A Lambda Function For Autometic Secret Rotation ## ------------------------------------------------------ ## Create a file containing lambda function code cat << 'EOF' > myfunction.py import boto3 import json import logging import os import pymysql logger = logging.getLogger() logger.setLevel(logging.INFO) def lambda_handler(event, context): """Secrets Manager RDS MySQL Handler This handler uses the single-user rotation scheme to rotate an RDS MySQL user credential. This rotation scheme logs into the database as the user and rotates the user's own password, immediately invalidating the user's previous password. The Secret SecretString is expected to be a JSON string with the following format: { 'engine': 'host': 'username': 'password': 'dbname': 'port': } Args: event (dict): Lambda dictionary of event parameters. These keys must include the following: - SecretId: The secret ARN or identifier - ClientRequestToken: The ClientRequestToken of the secret version - Step: The rotation step (one of createSecret, setSecret, testSecret, or finishSecret) context (LambdaContext): The Lambda runtime information Raises: ResourceNotFoundException: If the secret with the specified arn and stage does not exist ValueError: If the secret is not properly configured for rotation KeyError: If the secret json does not contain the expected keys """ arn = event['SecretId'] token = event['ClientRequestToken'] step = event['Step'] # Setup the client service_client = boto3.client('secretsmanager', endpoint_url=os.environ['SECRETS_MANAGER_ENDPOINT']) # Make sure the version is staged correctly metadata = service_client.describe_secret(SecretId=arn) if "RotationEnabled" in metadata and not metadata['RotationEnabled']: logger.error("Secret %s is not enabled for rotation" % arn) raise ValueError("Secret %s is not enabled for rotation" % arn) versions = metadata['VersionIdsToStages'] if token not in versions: logger.error("Secret version %s has no stage for rotation of secret %s." % (token, arn)) raise ValueError("Secret version %s has no stage for rotation of secret %s." % (token, arn)) if "AWSCURRENT" in versions[token]: logger.info("Secret version %s already set as AWSCURRENT for secret %s." % (token, arn)) return elif "AWSPENDING" not in versions[token]: logger.error("Secret version %s not set as AWSPENDING for rotation of secret %s." % (token, arn)) raise ValueError("Secret version %s not set as AWSPENDING for rotation of secret %s." % (token, arn)) # Call the appropriate step if step == "createSecret": create_secret(service_client, arn, token) elif step == "setSecret": set_secret(service_client, arn, token) elif step == "testSecret": test_secret(service_client, arn, token) elif step == "finishSecret": finish_secret(service_client, arn, token) else: logger.error("lambda_handler: Invalid step parameter %s for secret %s" % (step, arn)) raise ValueError("Invalid step parameter %s for secret %s" % (step, arn)) def create_secret(service_client, arn, token): """Generate a new secret This method first checks for the existence of a secret for the passed in token. If one does not exist, it will generate a new secret and put it with the passed in token. Args: service_client (client): The secrets manager service client arn (string): The secret ARN or other identifier token (string): The ClientRequestToken associated with the secret version Raises: ValueError: If the current secret is not valid JSON KeyError: If the secret json does not contain the expected keys """ # Make sure the current secret exists current_dict = get_secret_dict(service_client, arn, "AWSCURRENT") # Now try to get the secret version, if that fails, put a new secret try: get_secret_dict(service_client, arn, "AWSPENDING", token) logger.info("createSecret: Successfully retrieved secret for %s." % arn) except service_client.exceptions.ResourceNotFoundException: # Get exclude characters from environment variable exclude_characters = os.environ['EXCLUDE_CHARACTERS'] if 'EXCLUDE_CHARACTERS' in os.environ else '/@"\'\\' # Generate a random password passwd = service_client.get_random_password(ExcludeCharacters=exclude_characters) current_dict['password'] = passwd['RandomPassword'] # Put the secret service_client.put_secret_value(SecretId=arn, ClientRequestToken=token, SecretString=json.dumps(current_dict), VersionStages=['AWSPENDING']) logger.info("createSecret: Successfully put secret for ARN %s and version %s." % (arn, token)) def set_secret(service_client, arn, token): """Set the pending secret in the database This method tries to login to the database with the AWSPENDING secret and returns on success. If that fails, it tries to login with the AWSCURRENT and AWSPREVIOUS secrets. If either one succeeds, it sets the AWSPENDING password as the user password in the database. Else, it throws a ValueError. Args: service_client (client): The secrets manager service client arn (string): The secret ARN or other identifier token (string): The ClientRequestToken associated with the secret version Raises: ResourceNotFoundException: If the secret with the specified arn and stage does not exist ValueError: If the secret is not valid JSON or valid credentials are found to login to the database KeyError: If the secret json does not contain the expected keys """ try: previous_dict = get_secret_dict(service_client, arn, "AWSPREVIOUS") except (service_client.exceptions.ResourceNotFoundException, KeyError): previous_dict = None current_dict = get_secret_dict(service_client, arn, "AWSCURRENT") pending_dict = get_secret_dict(service_client, arn, "AWSPENDING", token) # First try to login with the pending secret, if it succeeds, return conn = get_connection(pending_dict) if conn: conn.close() logger.info("setSecret: AWSPENDING secret is already set as password in MySQL DB for secret arn %s." % arn) return # Make sure the user from current and pending match if current_dict['username'] != pending_dict['username']: logger.error("setSecret: Attempting to modify user %s other than current user %s" % (pending_dict['username'], current_dict['username'])) raise ValueError("Attempting to modify user %s other than current user %s" % (pending_dict['username'], current_dict['username'])) # Make sure the host from current and pending match if current_dict['host'] != pending_dict['host']: logger.error("setSecret: Attempting to modify user for host %s other than current host %s" % (pending_dict['host'], current_dict['host'])) raise ValueError("Attempting to modify user for host %s other than current host %s" % (pending_dict['host'], current_dict['host'])) # Now try the current password conn = get_connection(current_dict) if not conn and previous_dict: # If both current and pending do not work, try previous conn = get_connection(previous_dict) # Make sure the user/host from previous and pending match if previous_dict['username'] != pending_dict['username']: logger.error("setSecret: Attempting to modify user %s other than previous valid user %s" % (pending_dict['username'], previous_dict['username'])) raise ValueError("Attempting to modify user %s other than previous valid user %s" % (pending_dict['username'], previous_dict['username'])) if previous_dict['host'] != pending_dict['host']: logger.error("setSecret: Attempting to modify user for host %s other than previous host %s" % (pending_dict['host'], previous_dict['host'])) raise ValueError("Attempting to modify user for host %s other than previous host %s" % (pending_dict['host'], previous_dict['host'])) # If we still don't have a connection, raise a ValueError if not conn: logger.error("setSecret: Unable to log into database with previous, current, or pending secret of secret arn %s" % arn) raise ValueError("Unable to log into database with previous, current, or pending secret of secret arn %s" % arn) # Now set the password to the pending password try: with conn.cursor() as cur: cur.execute("SELECT VERSION()") ver = cur.fetchone() password_option = get_password_option(ver[0]) cur.execute("SET PASSWORD = " + password_option, pending_dict['password']) conn.commit() logger.info("setSecret: Successfully set password for user %s in MySQL DB for secret arn %s." % (pending_dict['username'], arn)) finally: conn.close() def test_secret(service_client, arn, token): """Test the pending secret against the database This method tries to log into the database with the secrets staged with AWSPENDING and runs a permissions check to ensure the user has the corrrect permissions. Args: service_client (client): The secrets manager service client arn (string): The secret ARN or other identifier token (string): The ClientRequestToken associated with the secret version Raises: ResourceNotFoundException: If the secret with the specified arn and stage does not exist ValueError: If the secret is not valid JSON or valid credentials are found to login to the database KeyError: If the secret json does not contain the expected keys """ # Try to login with the pending secret, if it succeeds, return conn = get_connection(get_secret_dict(service_client, arn, "AWSPENDING", token)) if conn: # This is where the lambda will validate the user's permissions. Uncomment/modify the below lines to # tailor these validations to your needs try: with conn.cursor() as cur: cur.execute("SELECT NOW()") conn.commit() finally: conn.close() logger.info("testSecret: Successfully signed into MySQL DB with AWSPENDING secret in %s." % arn) return else: logger.error("testSecret: Unable to log into database with pending secret of secret ARN %s" % arn) raise ValueError("Unable to log into database with pending secret of secret ARN %s" % arn) def finish_secret(service_client, arn, token): """Finish the rotation by marking the pending secret as current This method finishes the secret rotation by staging the secret staged AWSPENDING with the AWSCURRENT stage. Args: service_client (client): The secrets manager service client arn (string): The secret ARN or other identifier token (string): The ClientRequestToken associated with the secret version """ # First describe the secret to get the current version metadata = service_client.describe_secret(SecretId=arn) current_version = None for version in metadata["VersionIdsToStages"]: if "AWSCURRENT" in metadata["VersionIdsToStages"][version]: if version == token: # The correct version is already marked as current, return logger.info("finishSecret: Version %s already marked as AWSCURRENT for %s" % (version, arn)) return current_version = version break # Finalize by staging the secret version current service_client.update_secret_version_stage(SecretId=arn, VersionStage="AWSCURRENT", MoveToVersionId=token, RemoveFromVersionId=current_version) logger.info("finishSecret: Successfully set AWSCURRENT stage to version %s for secret %s." % (token, arn)) def get_connection(secret_dict): """Gets a connection to MySQL DB from a secret dictionary This helper function tries to connect to the database grabbing connection info from the secret dictionary. If successful, it returns the connection, else None Args: secret_dict (dict): The Secret Dictionary Returns: Connection: The pymysql.connections.Connection object if successful. None otherwise Raises: KeyError: If the secret json does not contain the expected keys """ # Parse and validate the secret JSON string port = int(secret_dict['port']) if 'port' in secret_dict else 3306 dbname = secret_dict['dbname'] if 'dbname' in secret_dict else None # Try to obtain a connection to the db try: conn = pymysql.connect(host=secret_dict['host'], user=secret_dict['username'], password=secret_dict['password'], port=port, database=dbname, connect_timeout=5) return conn except pymysql.OperationalError: return None def get_secret_dict(service_client, arn, stage, token=None): """Gets the secret dictionary corresponding for the secret arn, stage, and token This helper function gets credentials for the arn and stage passed in and returns the dictionary by parsing the JSON string Args: service_client (client): The secrets manager service client arn (string): The secret ARN or other identifier token (string): The ClientRequestToken associated with the secret version, or None if no validation is desired stage (string): The stage identifying the secret version Returns: SecretDictionary: Secret dictionary Raises: ResourceNotFoundException: If the secret with the specified arn and stage does not exist ValueError: If the secret is not valid JSON """ required_fields = ['host', 'username', 'password'] # Only do VersionId validation against the stage if a token is passed in if token: secret = service_client.get_secret_value(SecretId=arn, VersionId=token, VersionStage=stage) else: secret = service_client.get_secret_value(SecretId=arn, VersionStage=stage) plaintext = secret['SecretString'] secret_dict = json.loads(plaintext) # Run validations against the secret if 'engine' not in secret_dict or secret_dict['engine'] != 'mysql': raise KeyError("Database engine must be set to 'mysql' in order to use this rotation lambda") for field in required_fields: if field not in secret_dict: raise KeyError("%s key is missing from secret JSON" % field) # Parse and return the secret JSON string return secret_dict def get_password_option(version): """Gets the password option template string to use for the SET PASSWORD sql query This helper function takes in the mysql version and returns the appropriate password option template string that can be used in the SET PASSWORD query for that mysql version. Args: version (string): The mysql database version Returns: PasswordOption: The password option string """ if version.startswith("8"): return "%s" else: return "PASSWORD(%s)" EOF ## Zip the lambda function code file zip myfunction.zip myfunction.py ## Create package dependecy (pymysql) zip mkdir -p temp/python cd temp/python pip install pymysql -t . cd .. zip -r9 ../pymysql.zip . ## Create a lambda layer containing the package aws lambda publish-layer-version \ --layer-name pymysql \ --description "pymysql for mysql access" \ --zip-file fileb://../pymysql.zip \ --compatible-runtimes python3.8 cd .. ## Create assume role policy definition cat <<'EOF'> lambda_assume_role_policy.json { "Version": "2012-10-17", "Statement": [ { "Sid": "mylambdapolicy", "Effect": "Allow", "Principal": { "Service": "lambda.amazonaws.com" }, "Action": "sts:AssumeRole" } ] } EOF ## Create IAM role aws iam create-role \ --role-name lambda_iam_role \ --assume-role-policy-document file://lambda_assume_role_policy.json ## Get the role ARN and lambda layer ARN ACCOUNT_ID=$(aws sts get-caller-identity | jq -r .Account) && LAMBDA_IAM_ROLE_ARN=arn:aws:iam::$ACCOUNT_ID:role/lambda_iam_role && LAMBDA_LAYER_ARN=$(aws lambda list-layer-versions \ --layer-name pymysql | jq -r .LayerVersions[0].LayerVersionArn) && echo $LAMBDA_IAM_ROLE_ARN && echo $LAMBDA_LAYER_ARN ## Create the lambda function aws lambda create-function \ --function-name myfunction \ --runtime python3.8 \ --zip-file fileb://myfunction.zip \ --handler myfunction.lambda_handler \ --environment 'Variables={SECRETS_MANAGER_ENDPOINT=https://secretsmanager.ap-south-1.amazonaws.com}' \ --role $LAMBDA_IAM_ROLE_ARN \ --layers $LAMBDA_LAYER_ARN ## Provide permission to secret manager to invoke this lambda aws lambda add-permission \ --function-name myfunction \ --action lambda:InvokeFunction \ --statement-id secretsmanager \ --principal secretsmanager.amazonaws.com ## Get lambda function arn LAMBDA_ARN=$(aws lambda get-function \ --function-name myfunction | jq -r .Configuration.FunctionArn) && echo $LAMBDA_ARN ## Create IAM policy definition for lambda cat < { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "secretsmanager:*", "logs:*", "rds:*" ], "Resource": "*" } ] } EOF ## Update the IAM role with the above IAM policy aws iam put-role-policy \ --role-name lambda_iam_role \ --policy-name lambda_iam_policy \ --policy-document file://lambda_iam_policy.json |
Note: If you are using some other RDS instance type or other types of secrets, you need to change the lambda function accordingly. AWS community provides a set of templates that you can use as reference.
Note: The above lambda IAM role policy is over permissive, you are welcome to refine the permission as you need and comment so that I can update this and others also get the benefit.
Now we are all set to enable auto rotation of RDS secrets in Secret manager.
Step 4: Enable auto rotation of secrets for AWS RDS instance in AWS Secret Manager
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 |
## ------------------------------- ## Enable Auto-Rotation of secrets ## ------------------------------- ## Create auto rotation configuration aws secretsmanager rotate-secret \ --secret-id mysqldbsecrets \ --rotation-lambda-arn $LAMBDA_ARN \ --rotation-rules AutomaticallyAfterDays=30 ## Get secret version details aws secretsmanager list-secret-version-ids \ --secret-id mysqldbsecrets \ --include-deprecated ## Get current version secret value aws secretsmanager get-secret-value \ --secret-id mysqldbsecrets \ --version-stage AWSCURRENT | jq -r .SecretString DB_ADMIN_PASSWORD=' ## Test db connection mysql -u $DB_ADMIN_USERNAME -p$DB_ADMIN_PASSWORD -h $DB_HOST $DB_NAME quit; |
We have successfully enabled auto-rotation of RDS instance secrets in AWS secret manager.
If you are getting any error, check the lambda executing logs in CloudWatch to troubleshoot the error.
Next, we will delete all the resources created in this demo to avoid any additional cost.
Step 5: Clean up.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 |
## -------- ## Clean Up ## -------- ## Delete lambda function aws lambda delete-function \ --function-name myfunction ## Delete lambda layer aws lambda delete-layer-version \ --layer-name pymysql \ --version-number 1 ## Delete IAM Role & Policy aws iam delete-role-policy \ --role-name lambda_iam_role \ --policy-name lambda_iam_policy && aws iam delete-role \ --role-name lambda_iam_role ## Delete the secret aws secretsmanager delete-secret \ --secret-id mysqldbsecrets \ --force-delete-without-recovery ## Delete the mysql rds instance aws rds delete-db-instance \ --db-instance-identifier $DB_INSTANCE_NAME \ --skip-final-snapshot \ --delete-automated-backups |
Hope you have enjoyed this article. To know more about AWS Secret Manager, please refer below official documentation
https://docs.aws.amazon.com/secretsmanager/index.html