Reboot MSK Broker with AWS Fault Injection Service

Customers have asked me how they can use Fault Injection Service to impact Amazon Managed Streaming for apache Kafka by rebooting brokers. In this blog post, I’ll demonstrate how to do that.

I will give you two flows on how you can reboot the MSK broker via FIS.

Flow 1: FIS => SSM => MSK reboot broker
Flow 2: FIS => SSM => Lambda => MSK reboot broker

Lets create some IAM roles first that are needed for FIS, SSM and Lambda

1. If you do not have created a FIS role yet, you’ll have to create one. We will start with the fis trust policy.

aws iam create-role \
  --role-name ssm-fis-role \
  --assume-role-policy-document '{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Principal": {
                "Service": [
                  "fis.amazonaws.com"
                ]
            },
            "Action": "sts:AssumeRole"
        }
    ]
}'

2. Having insights into experiments is key. Therefore, we need to ensure that FIS can write to CloudWatch Logs.

aws iam put-role-policy \
  --role-name ssm-fis-role \
  --policy-name fis-cloudwatch \
  --policy-document '{
    "Version": "2012-10-17",
    "Statement": [
      {
        "Effect": "Allow",
        "Action": [
          "logs:CreateLogDelivery",
          "logs:PutResourcePolicy",
          "logs:DescribeResourcePolicies",
          "logs:DescribeLogGroups"
        ],
        "Resource": "*"
      }
    ]
  }'

3. Next we will add SSM permissions to the FIS role so that it can call SSM Documents. We can attach the AWSFaultInjectionSimulatorSSMAccess role policy.

aws iam attach-role-policy --role-name ssm-fis-role --policy-arn arn:aws:iam::aws:policy/service-role/AWSFaultInjectionSimulatorSSMAccess

——————————————————————————————————————————————————————————————————————————-
If you intend to use flow 1, the execution of an MSK broker reboot through FIS => SSM without Lambda in the path follow step 4a. For flow 2 use 4b.
——————————————————————————————————————————————————————————————————————————-

4a. Create the SSM role to execute the MSK broker reboot. Once created, follow step 9a.

aws iam create-role \
  --role-name SSMDocumentMskRole \
  --assume-role-policy-document '{
    "Version": "2012-10-17",
    "Statement": [
      {
        "Effect": "Allow",
        "Action": [
          "kafka:DescribeCluster",
          "kafka:RebootBroker",
          "kafka:DescribeClusterOperation"
        ],
        "Resource": "*"
          "arn:aws:kafka:us-east-1:123456789101:cluster/demo-cluster-1/85751407-8d09-48b7-af7b-ab25de795414-3",
          "arn:aws:kafka:us-east-1:123456789101:cluster/demo-cluster-2/7e173d6b-fc7c-4504-b401-3813ebc6d495-3"
      }
    ]
  }'

4b. This role is needed for flow 2 to trigger the broker reboot, we will call lambda via SSM. This role will be used for the SSM Document of the SSM to Lambda role.

aws iam create-role \
  --role-name SSMDocumentLambdaRole \
  --assume-role-policy-document '{
    "Version": "2012-10-17",
    "Statement": [
      {
        "Effect": "Allow",
        "Action": [
          "lambda:InvokeFunction",
          "lambda:DeleteFunction"
        ],
        "Resource": "*"
      }
    ]
  }'

5. To finish role creations for this experiment we will create the Lambda execution role and add the trust policy.

aws iam create-role \
  --role-name LambdaMskRebootBrokerexecutionRole \
  --assume-role-policy-document '{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Principal": {
                "Service": [
                  "lambda.amazonaws.com"
                ]
            },
            "Action": "sts:AssumeRole"
        }
    ]
}'

6. Next we will add permission to the role so it can make calls to Amazon MSK and Amazon CloudWatch from Lambda.

aws iam put-role-policy \
  --role-name LambdaMskRebootBrokerexecutionRole \
  --policy-name msk-fis-cloudwatch \
  --policy-document '{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": "logs:CreateLogGroup",
            "Resource": "*"
        },
        {
            "Effect": "Allow",
            "Action": [
                "logs:CreateLogStream",
                "logs:PutLogEvents"
            ],
            "Resource": [
                "*"
            ]
        },
        {
            "Sid": "Statement1",
            "Effect": "Allow",
            "Action": [
                "kafka:DescribeCluster",
                "kafka:RebootBroker",
                "kafka:DescribeClusterOperation"
            ],
            "Resource": [
                "arn:aws:kafka:us-east-1:123456789101:cluster/demo-cluster-1/85751407-8d09-48b7-af7b-ab25de795414-3",
                "arn:aws:kafka:us-east-1:123456789101:cluster/demo-cluster-2/7e173d6b-fc7c-4504-b401-3813ebc6d495-3"
            ]
        }
    ]
}'

7. Let’s create our Lambda function that will trigger the MSK cluster reboot

cat > reboot-msk-broker.py << EOF
import json
import boto3

def lambda_handler(event, context):
    # Extract the cluster ARN and broker ID from the event
    response = event
    client = boto3.client('kafka')

    #convert string to  object
    response_in_json = json.loads(event)

    #access first_name in dictionary
    print(response_in_json["clusterArn"])
    print(response_in_json['brokerId'])

    cluster_arn = response_in_json["clusterArn"]
    broker_id = response_in_json['brokerId']

    # Reboot the broker
    response_mskcluster = client.reboot_broker(
    BrokerIds=[
        broker_id,
    ],
    ClusterArn=cluster_arn
    )

    cluster_arn = response_mskcluster["ClusterArn"]
    cluster_operation_arn = response_mskcluster["ClusterOperationArn"]
   
    response_operation = client.describe_cluster_operation(ClusterOperationArn=cluster_operation_arn)
    cluster_operation_state = response_operation["ClusterOperationInfo"]['OperationState']
    cluster_operation_type = response_operation["ClusterOperationInfo"]['OperationType']
   
    return {
        'brokerId': broker_id,
        "cluster_operation_state": cluster_operation_state,
        "cluster_operation_type": cluster_operation_type,
        "cluster_operation_arn" : cluster_operation_arn
    }
EOF

8. Zip up the reboot-msk-broker.py so we can upload it to lambda in the next step.

zip reboot-msk-broker.zip reboot-msk-broker.py

We will now create the lambda function

aws lambda create-function \
  --function-name reboot-msk-broker \
  --runtime python3.11 \
  --role arn:aws:iam::123456789101:role/LambdaMskRebootBrokerexecutionRole \
  --handler reboot-msk-broker.lambda_handler \
  --zip-file fileb://reboot-msk-broker.zip

9a. Flow 1: Create the SSM Document that we will trigger from FIS. Make sure that you replace the assumeRole ARN with the ARN for the SSMDocumentMskRole. Next go to step 10.

cat > RebootMskBroker.yml << EOF
description: |
  ---
  # Reboot MSK Broker
  This SSM Document executes the MSK reboot-broker API via Lambda
  ## Input paramters

  1. The MSK **ClusterARN**
  2. The **BrokerID** to reboot

  To read more about [msk-reboot-broker](https://docs.aws.amazon.com/msk/1.0/apireference/clusters-clusterarn-reboot-broker.html#clusters-clusterarn-reboot-broker-prop-rebootbrokerrequest-brokerids) consult the AWS Documentation. 

  ## Output parameters example
  * "brokerId": "3"
  *  "cluster_operation_state": "PENDING"
  * "cluster_operation_type": "REBOOT_NODE"
  * "cluster_operation_arn": "arn:aws:kafka:us-east-1:123456789012:cluster-operation/demo-cluster-2/7e173d6b-fc7c-4504-b401-3813ebc6d495-3/ee9294d5-bd9a-42c2-a951-f62115092407"

  You can use the cluster_operation_arn to query the brokers state during its recovery via AWS CLI

  >aws kafka describe-cluster-operation --cluster-operation-arn **arn:aws:kafka:us-east-1:123456789012:cluster-operation/demo-cluster-1/85751407-8d09-48b7-af7b-ab25de795414-3/c21d622d-caf9-4df5-80fe-80f04ddd8816**
schemaVersion: '0.3'
assumeRole: 'arn:aws:iam::671061056857:role/SSMDocumentMSKRole'
parameters:
  clusterArn:
    type: String
    description: The ARN of the MSK cluster to reboot
  brokerId:
    type: String
    description: The ID of the MSK broker to reboot
mainSteps:
  - name: RebootBroker
    action: 'aws:executeScript'
    inputs:
      Runtime: python3.8
      Handler: reboot_msk_broker
      Script: |-
        import json
        import boto3

        def reboot_msk_broker(event, context):
            # Extract the cluster ARN and broker ID from the event
            client = boto3.client('kafka')
           
            #convert string to  object
           

            #access first_name in dictionary
            print(f"ClusterARN: event['clusterArn']")
            print(f"BrokerId: event['brokerId']")

            cluster_arn = event['clusterArn']
            broker_id = event['brokerId']

            # Reboot the broker
            response_mskcluster = client.reboot_broker(
            BrokerIds=[
                broker_id,
            ],
            ClusterArn=cluster_arn
            )

            cluster_arn = response_mskcluster['ClusterArn']
            cluster_operation_arn = response_mskcluster['ClusterOperationArn']
           
            response_operation = client.describe_cluster_operation(ClusterOperationArn=cluster_operation_arn)
            cluster_operation_state = response_operation['ClusterOperationInfo']['OperationState']
            cluster_operation_type = response_operation['ClusterOperationInfo']['OperationType']
           
            return {
                'brokerId': broker_id,
                "cluster_operation_state": cluster_operation_state,
                "cluster_operation_type": cluster_operation_type,
                "cluster_operation_arn" : cluster_operation_arn
            }
      InputPayload:
        clusterArn: '{{ clusterArn }}'
        brokerId: '{{ brokerId }}'
EOF

9b. Flow 2: Create the SSM Document that we will trigger from FIS. Make sure that you replace the assumeRole ARN with the ARN for the SSMDocumentLambdaRole

cat > RebootMskBroker.yml << EOF
description: |
  ---
  # Reboot MSK Broker
  This SSM Document executes the MSK reboot-broker API via Lambda
  ## Input paramters

  1. The MSK **ClusterARN**
  2. The **BrokerID** to reboot

  To read more about [msk-reboot-broker](https://docs.aws.amazon.com/msk/1.0/apireference/clusters-clusterarn-reboot-broker.html#clusters-clusterarn-reboot-broker-prop-rebootbrokerrequest-brokerids) consult the AWS Documentation. 

  ## Output parameters example
  * "brokerId": "3"
  *  "cluster_operation_state": "PENDING"
  * "cluster_operation_type": "REBOOT_NODE"
  * "cluster_operation_arn": "arn:aws:kafka:us-east-1:123456789012:cluster-operation/demo-cluster-2/7e173d6b-fc7c-4504-b401-3813ebc6d495-3/ee9294d5-bd9a-42c2-a951-f62115092407"

  You can use the cluster_operation_arn to query the brokers state during its recovery via AWS CLI

  >aws kafka describe-cluster-operation --cluster-operation-arn **arn:aws:kafka:us-east-1:123456789012:cluster-operation/demo-cluster-1/85751407-8d09-48b7-af7b-ab25de795414-3/c21d622d-caf9-4df5-80fe-80f04ddd8816**
schemaVersion: '0.3'
assumeRole: 'arn:aws:iam::671061056857:role/SSMDocumentLambdaRole'
parameters:
  clusterArn:
    type: String
    description: The ARN of the MSK cluster to reboot
  brokerId:
    type: String
    description: The ID of the MSK broker to reboot
mainSteps:
  - name: RebootBroker
    action: 'aws:invokeLambdaFunction'
    inputs:
      InvocationType: RequestResponse
      Payload: '"{\"clusterArn\": \"{{clusterArn}}\", \"brokerId\":\"{{brokerId}}\"}"'
      FunctionName: 'arn:aws:lambda:us-east-1:671061056857:function:rebootMSK'
    outputs:
      - Name: brokerId
        Selector: $.Payload.brokerid
        Type: String
      - Name: cluster_operation_state
        Selector: $.Payload.cluster_operation_state
        Type: String
      - Name: cluster_operation_type
        Selector: $.Payload.cluster_operation_type
        Type: String
      - Name: cluster_operation_arn
        Selector: $.Payload.cluster_operation_arn
        Type: String
EOF

10. Create the SSM document that takes the input for clusterArn and brokerId. This document will be called by the Fault Injection Service experiment.

aws ssm create-document \
    --content file://RebootMskBroker.yml \
    --name "RebootMskBrokerNow" \
    --document-type "Automation" \
    --document-format YAML

11. Create the FIS template. Note the Template ID that you are getting back as we will need it in the next step.

aws fis create-experiment-template \
    --cli-input-json '{
    "description": "RebootMSKBroker",
        "targets": {},
        "actions": {
            "RebootMSK": {
                "actionId": "aws:ssm:start-automation-execution",
                "parameters": {
                    "documentArn": "arn:aws:ssm:us-east-1:123456789101:document/RebootMskBrokerNow",
                    "documentParameters": "{"clusterArn": "arn:aws:kafka:us-east-1:671061056857:cluster/demo-cluster-1/85751407-8d09-48b7-af7b-ab25de795414-3","brokerId": "3" }",
                    "maxDuration": "PT1M"
                }
            }
        },
        "stopConditions": [
            {
                "source": "none"
            }
        ],
        "roleArn": "arn:aws:iam::671061056857:role/ssm-fis-role",
        "tags": {
            "Name": "RebootMskBroker1"
        },
        "logConfiguration": {
            "cloudWatchLogsConfiguration": {
                "logGroupArn": "arn:aws:logs:us-east-1:123456789101:log-group:/aws/fis/experiments:*"
            },
            "logSchemaVersion": 2
        }
}'

12. Start the experiment from the command line

aws fis start-experiment \
    --experiment-template-id EXTWTgEXTuhavDn

You can now go to the MSK console and will see your broker rebooting