Customers have asked me how they can use Fault Injection Service to impact Amazon Managed Streaming for apache Kafka by rebooting brokers. In this blog post, I’ll demonstrate how to do that.
I will give you two flows on how you can reboot the MSK broker via FIS.
Flow 1: FIS => SSM => MSK reboot broker
Flow 2: FIS => SSM => Lambda => MSK reboot broker
Lets create some IAM roles first that are needed for FIS, SSM and Lambda
1. If you do not have created a FIS role yet, you’ll have to create one. We will start with the fis trust policy.
aws iam create-role \ --role-name ssm-fis-role \ --assume-role-policy-document '{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Principal": { "Service": [ "fis.amazonaws.com" ] }, "Action": "sts:AssumeRole" } ] }'
2. Having insights into experiments is key. Therefore, we need to ensure that FIS can write to CloudWatch Logs.
aws iam put-role-policy \ --role-name ssm-fis-role \ --policy-name fis-cloudwatch \ --policy-document '{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "logs:CreateLogDelivery", "logs:PutResourcePolicy", "logs:DescribeResourcePolicies", "logs:DescribeLogGroups" ], "Resource": "*" } ] }'
3. Next we will add SSM permissions to the FIS role so that it can call SSM Documents. We can attach the AWSFaultInjectionSimulatorSSMAccess role policy.
aws iam attach-role-policy --role-name ssm-fis-role --policy-arn arn:aws:iam::aws:policy/service-role/AWSFaultInjectionSimulatorSSMAccess
——————————————————————————————————————————————————————————————————————————-
If you intend to use flow 1, the execution of an MSK broker reboot through FIS => SSM without Lambda in the path follow step 4a. For flow 2 use 4b.
——————————————————————————————————————————————————————————————————————————-
4a. Create the SSM role to execute the MSK broker reboot. Once created, follow step 9a.
aws iam create-role \ --role-name SSMDocumentMskRole \ --assume-role-policy-document '{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "kafka:DescribeCluster", "kafka:RebootBroker", "kafka:DescribeClusterOperation" ], "Resource": "*" "arn:aws:kafka:us-east-1:123456789101:cluster/demo-cluster-1/85751407-8d09-48b7-af7b-ab25de795414-3", "arn:aws:kafka:us-east-1:123456789101:cluster/demo-cluster-2/7e173d6b-fc7c-4504-b401-3813ebc6d495-3" } ] }'
4b. This role is needed for flow 2 to trigger the broker reboot, we will call lambda via SSM. This role will be used for the SSM Document of the SSM to Lambda role.
aws iam create-role \ --role-name SSMDocumentLambdaRole \ --assume-role-policy-document '{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "lambda:InvokeFunction", "lambda:DeleteFunction" ], "Resource": "*" } ] }'
5. To finish role creations for this experiment we will create the Lambda execution role and add the trust policy.
aws iam create-role \ --role-name LambdaMskRebootBrokerexecutionRole \ --assume-role-policy-document '{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Principal": { "Service": [ "lambda.amazonaws.com" ] }, "Action": "sts:AssumeRole" } ] }'
6. Next we will add permission to the role so it can make calls to Amazon MSK and Amazon CloudWatch from Lambda.
aws iam put-role-policy \ --role-name LambdaMskRebootBrokerexecutionRole \ --policy-name msk-fis-cloudwatch \ --policy-document '{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": "logs:CreateLogGroup", "Resource": "*" }, { "Effect": "Allow", "Action": [ "logs:CreateLogStream", "logs:PutLogEvents" ], "Resource": [ "*" ] }, { "Sid": "Statement1", "Effect": "Allow", "Action": [ "kafka:DescribeCluster", "kafka:RebootBroker", "kafka:DescribeClusterOperation" ], "Resource": [ "arn:aws:kafka:us-east-1:123456789101:cluster/demo-cluster-1/85751407-8d09-48b7-af7b-ab25de795414-3", "arn:aws:kafka:us-east-1:123456789101:cluster/demo-cluster-2/7e173d6b-fc7c-4504-b401-3813ebc6d495-3" ] } ] }'
7. Let’s create our Lambda function that will trigger the MSK cluster reboot
cat > reboot-msk-broker.py << EOF import json import boto3 def lambda_handler(event, context): # Extract the cluster ARN and broker ID from the event response = event client = boto3.client('kafka') #convert string to object response_in_json = json.loads(event) #access first_name in dictionary print(response_in_json["clusterArn"]) print(response_in_json['brokerId']) cluster_arn = response_in_json["clusterArn"] broker_id = response_in_json['brokerId'] # Reboot the broker response_mskcluster = client.reboot_broker( BrokerIds=[ broker_id, ], ClusterArn=cluster_arn ) cluster_arn = response_mskcluster["ClusterArn"] cluster_operation_arn = response_mskcluster["ClusterOperationArn"] response_operation = client.describe_cluster_operation(ClusterOperationArn=cluster_operation_arn) cluster_operation_state = response_operation["ClusterOperationInfo"]['OperationState'] cluster_operation_type = response_operation["ClusterOperationInfo"]['OperationType'] return { 'brokerId': broker_id, "cluster_operation_state": cluster_operation_state, "cluster_operation_type": cluster_operation_type, "cluster_operation_arn" : cluster_operation_arn } EOF
8. Zip up the reboot-msk-broker.py so we can upload it to lambda in the next step.
zip reboot-msk-broker.zip reboot-msk-broker.py
We will now create the lambda function
aws lambda create-function \ --function-name reboot-msk-broker \ --runtime python3.11 \ --role arn:aws:iam::123456789101:role/LambdaMskRebootBrokerexecutionRole \ --handler reboot-msk-broker.lambda_handler \ --zip-file fileb://reboot-msk-broker.zip
9a. Flow 1: Create the SSM Document that we will trigger from FIS. Make sure that you replace the assumeRole ARN with the ARN for the SSMDocumentMskRole. Next go to step 10.
cat > RebootMskBroker.yml << EOF description: | --- # Reboot MSK Broker This SSM Document executes the MSK reboot-broker API via Lambda ## Input paramters 1. The MSK **ClusterARN** 2. The **BrokerID** to reboot To read more about [msk-reboot-broker](https://docs.aws.amazon.com/msk/1.0/apireference/clusters-clusterarn-reboot-broker.html#clusters-clusterarn-reboot-broker-prop-rebootbrokerrequest-brokerids) consult the AWS Documentation. ## Output parameters example * "brokerId": "3" * "cluster_operation_state": "PENDING" * "cluster_operation_type": "REBOOT_NODE" * "cluster_operation_arn": "arn:aws:kafka:us-east-1:123456789012:cluster-operation/demo-cluster-2/7e173d6b-fc7c-4504-b401-3813ebc6d495-3/ee9294d5-bd9a-42c2-a951-f62115092407" You can use the cluster_operation_arn to query the brokers state during its recovery via AWS CLI >aws kafka describe-cluster-operation --cluster-operation-arn **arn:aws:kafka:us-east-1:123456789012:cluster-operation/demo-cluster-1/85751407-8d09-48b7-af7b-ab25de795414-3/c21d622d-caf9-4df5-80fe-80f04ddd8816** schemaVersion: '0.3' assumeRole: 'arn:aws:iam::671061056857:role/SSMDocumentMSKRole' parameters: clusterArn: type: String description: The ARN of the MSK cluster to reboot brokerId: type: String description: The ID of the MSK broker to reboot mainSteps: - name: RebootBroker action: 'aws:executeScript' inputs: Runtime: python3.8 Handler: reboot_msk_broker Script: |- import json import boto3 def reboot_msk_broker(event, context): # Extract the cluster ARN and broker ID from the event client = boto3.client('kafka') #convert string to object #access first_name in dictionary print(f"ClusterARN: event['clusterArn']") print(f"BrokerId: event['brokerId']") cluster_arn = event['clusterArn'] broker_id = event['brokerId'] # Reboot the broker response_mskcluster = client.reboot_broker( BrokerIds=[ broker_id, ], ClusterArn=cluster_arn ) cluster_arn = response_mskcluster['ClusterArn'] cluster_operation_arn = response_mskcluster['ClusterOperationArn'] response_operation = client.describe_cluster_operation(ClusterOperationArn=cluster_operation_arn) cluster_operation_state = response_operation['ClusterOperationInfo']['OperationState'] cluster_operation_type = response_operation['ClusterOperationInfo']['OperationType'] return { 'brokerId': broker_id, "cluster_operation_state": cluster_operation_state, "cluster_operation_type": cluster_operation_type, "cluster_operation_arn" : cluster_operation_arn } InputPayload: clusterArn: '{{ clusterArn }}' brokerId: '{{ brokerId }}' EOF
9b. Flow 2: Create the SSM Document that we will trigger from FIS. Make sure that you replace the assumeRole ARN with the ARN for the SSMDocumentLambdaRole
cat > RebootMskBroker.yml << EOF description: | --- # Reboot MSK Broker This SSM Document executes the MSK reboot-broker API via Lambda ## Input paramters 1. The MSK **ClusterARN** 2. The **BrokerID** to reboot To read more about [msk-reboot-broker](https://docs.aws.amazon.com/msk/1.0/apireference/clusters-clusterarn-reboot-broker.html#clusters-clusterarn-reboot-broker-prop-rebootbrokerrequest-brokerids) consult the AWS Documentation. ## Output parameters example * "brokerId": "3" * "cluster_operation_state": "PENDING" * "cluster_operation_type": "REBOOT_NODE" * "cluster_operation_arn": "arn:aws:kafka:us-east-1:123456789012:cluster-operation/demo-cluster-2/7e173d6b-fc7c-4504-b401-3813ebc6d495-3/ee9294d5-bd9a-42c2-a951-f62115092407" You can use the cluster_operation_arn to query the brokers state during its recovery via AWS CLI >aws kafka describe-cluster-operation --cluster-operation-arn **arn:aws:kafka:us-east-1:123456789012:cluster-operation/demo-cluster-1/85751407-8d09-48b7-af7b-ab25de795414-3/c21d622d-caf9-4df5-80fe-80f04ddd8816** schemaVersion: '0.3' assumeRole: 'arn:aws:iam::671061056857:role/SSMDocumentLambdaRole' parameters: clusterArn: type: String description: The ARN of the MSK cluster to reboot brokerId: type: String description: The ID of the MSK broker to reboot mainSteps: - name: RebootBroker action: 'aws:invokeLambdaFunction' inputs: InvocationType: RequestResponse Payload: '"{\"clusterArn\": \"{{clusterArn}}\", \"brokerId\":\"{{brokerId}}\"}"' FunctionName: 'arn:aws:lambda:us-east-1:671061056857:function:rebootMSK' outputs: - Name: brokerId Selector: $.Payload.brokerid Type: String - Name: cluster_operation_state Selector: $.Payload.cluster_operation_state Type: String - Name: cluster_operation_type Selector: $.Payload.cluster_operation_type Type: String - Name: cluster_operation_arn Selector: $.Payload.cluster_operation_arn Type: String EOF
10. Create the SSM document that takes the input for clusterArn and brokerId. This document will be called by the Fault Injection Service experiment.
aws ssm create-document \ --content file://RebootMskBroker.yml \ --name "RebootMskBrokerNow" \ --document-type "Automation" \ --document-format YAML
11. Create the FIS template. Note the Template ID that you are getting back as we will need it in the next step.
aws fis create-experiment-template \ --cli-input-json '{ "description": "RebootMSKBroker", "targets": {}, "actions": { "RebootMSK": { "actionId": "aws:ssm:start-automation-execution", "parameters": { "documentArn": "arn:aws:ssm:us-east-1:123456789101:document/RebootMskBrokerNow", "documentParameters": "{"clusterArn": "arn:aws:kafka:us-east-1:671061056857:cluster/demo-cluster-1/85751407-8d09-48b7-af7b-ab25de795414-3","brokerId": "3" }", "maxDuration": "PT1M" } } }, "stopConditions": [ { "source": "none" } ], "roleArn": "arn:aws:iam::671061056857:role/ssm-fis-role", "tags": { "Name": "RebootMskBroker1" }, "logConfiguration": { "cloudWatchLogsConfiguration": { "logGroupArn": "arn:aws:logs:us-east-1:123456789101:log-group:/aws/fis/experiments:*" }, "logSchemaVersion": 2 } }'
12. Start the experiment from the command line
aws fis start-experiment \ --experiment-template-id EXTWTgEXTuhavDn
You can now go to the MSK console and will see your broker rebooting