Back to blog
Aug 31, 2021
4 min read

Example how to analyze DynamoDB item changes with Kinesis and Athena created with CDK

This post is how stream data changes of a DynamoDb table via Kinesis Data Stream and Kinesis Firehose to S3, and analyze the data with Athena. Build with CDK.

This is the same like described [here]({{ site.baseurl }}/aws/2021/08/27/aws_example_ddb_analytics/), but instead of terraform it’s build with CDK.

To bootrap the project run this command: cdk init app --language typescript Further information are here

All the services are in this file.

KMS key

This creates are KMS key with an alias to encrypt the data in the created services.

const kmsKey = new kms.Key(this, 'kmsKey', {
      enableKeyRotation: true,
    })
 
kmsKey.addAlias(name)

DynamoDb and Kinesis Data Stream

This is the creation of the DynamoDb with the Kinesis Data Stream.

const stream = new kinesis.Stream(this, 'Stream', {
      streamName: `${name}-data-stream`,
      encryption: kinesis.StreamEncryption.KMS,
      encryptionKey: kmsKey,
    })
 
    const table = new dynamodb.Table(this, 'Table', {
      tableName: name,
      partitionKey: { name: 'pk', type: dynamodb.AttributeType.STRING },
      billingMode: dynamodb.BillingMode.PAY_PER_REQUEST,
      encryption: dynamodb.TableEncryption.CUSTOMER_MANAGED,
      encryptionKey: kmsKey,
      kinesisStream: stream,
    })

That adds to the DynamoDb, a Kinesis Data Stream, and connects it to the DynamoDb.

![kinesis data stream]({{ site.baseurl }}/img/2021-10-26-aws_example_ddb_analytics_cdk/kinesis_data_stream.png)

![kinesis data stream ddb]({{ site.baseurl }}/img/2021-10-26-aws_example_ddb_analytics_cdk/kinesis_data_stream_ddb.png)

Kinesis Data Firehose and S3 Bucket

Kinesis Data Firehose is the connection between the Kinesis Data Stream to the S3 Bucket.

Unfortunately, Firehose stores the JSONs without a linefeed. Therefore it’s a lambda for conversion is necessary.

More about that is described in this post

It looks like this.

 const firehoseBucket = new s3.Bucket(this, 'firehose-s3-bucket', {
      bucketName: `${name}-firehose-s3-bucket`,
      encryptionKey: kmsKey,
    })
 
const processor = new lambda.NodejsFunction(this, 'lambda-function-processor', {
  functionName: `${name}-firehose-converter`,
  timeout: cdk.Duration.minutes(2),
  bundling: {
    sourceMap: true,
  },
})
 
const lambdaProcessor = new LambdaFunctionProcessor(processor, {
  retries: 5,
})
 
const s3Destination = new destinations.S3Bucket(firehoseBucket, {
  encryptionKey: kmsKey,
  bufferingInterval: cdk.Duration.seconds(60),
  processor: lambdaProcessor,
})
 
const firehoseDeliveryStream = new firehose.DeliveryStream(this, 'Delivery Stream', {
  deliveryStreamName: `${name}-firehose`,
  sourceStream: stream,
  destinations: [s3Destination],
})

The delivery of the data to the S3 bucket is buffered. Here are the default values.

![firehose-buffer]({{ site.baseurl }}/img/2021-10-26-aws_example_ddb_analytics_cdk/firehose_buffer.png)

Glue crawler

Athena needs a structured table for the SQL queries. The Glue crawler creates this from the data in the S3 bucket.

The glue crawler isn’t a L2 construct yet. So it needs a L1 construct. See here more about L1 - L3.

There is already a github issue to create a L2 construct for the glue crawler.

const getSecurityConfiguration = new iam.PolicyDocument({
      statements: [
        new iam.PolicyStatement({
          actions: ['glue:GetSecurityConfiguration'],
          resources: ['*']
        })
      ]
    })
 
  const roleCrawler = new iam.Role(this, 'role crawler', {
    roleName: `${name}-crawler-role`,
    assumedBy: new iam.ServicePrincipal('glue.amazonaws.com'),
    inlinePolicies: {
      GetSecurityConfiguration: getSecurityConfiguration
    }
  })
 
  const glueDb = new glue.Database(this, 'glue db', {
    databaseName: `${name}-db`,
  })
 
  const glueSecurityOptions = new glue.SecurityConfiguration(this, 'glue security options', {
    securityConfigurationName: `${name}-security-options`,
    s3Encryption: {
      mode: glue.S3EncryptionMode.KMS,
    },
  })
 
  const crawler = new glue.CfnCrawler(this, 'crawler', {
    name: `${name}-crawler`,
    role: roleCrawler.roleArn,
    targets: {
      s3Targets: [
        {
          path: `s3://${firehoseBucket.bucketName}`,
        },
      ],
    },
    databaseName: glueDb.databaseName,
    crawlerSecurityConfiguration: glueSecurityOptions.securityConfigurationName,
  })
 
  // const glueCrawlerLogArn = `arn:aws:logs:${cdk.Stack.of(this).region}:${cdk.Stack.of(this).account}:log-group:/aws-glue/crawlers:log-stream:${crawler.name}`
  const glueCrawlerLogArn = `arn:aws:logs:${cdk.Stack.of(this).region}:${cdk.Stack.of(this).account}:log-group:/aws-glue/crawlers*` //:log-stream:${crawler.name}`
 
  const glueTableArn = `arn:aws:glue:${cdk.Stack.of(this).region}:${cdk.Stack.of(this).account}:table/${glueDb.databaseName}/*`
 
  const glueCrawlerArn = `arn:aws:glue:${cdk.Stack.of(this).region}:${cdk.Stack.of(this).account}:crawler/${crawler.name}`
 
  roleCrawler.addToPolicy(
    new iam.PolicyStatement({
      resources: [
        glueCrawlerLogArn,
        glueTableArn,
        glueDb.catalogArn,
        glueDb.databaseArn,
        kmsKey.keyArn,
        firehoseBucket.bucketArn,
        `${firehoseBucket.bucketArn}/*`,
        glueCrawlerArn,
      ],
      actions: ['logs:*', 'glue:*', 'kms:*', 'S3:*'],
    })
  )

For test purposes, it’s enough to run the crawler before any analysis. Scheduling is also possible.

![glue-run-crawler]({{ site.baseurl }}/img/2021-10-26-aws_example_ddb_analytics_cdk/glue_run_crawler.png)

That creates this table, which is accessible by Athena.

![glue-table]({{ site.baseurl }}/img/2021-10-26-aws_example_ddb_analytics_cdk/glue_table.png)

Athena

For Athena it needs an S3 bucket for the query results and, for better isolation to other projects, a workgroup.

const athenaQueryResults = new s3.Bucket(this, 'query-results', {
      bucketName: `${name}-query-results`,
      encryptionKey: kmsKey,
    })
 
new athena.CfnWorkGroup(this, 'analytics-athena-workgroup', {
  name: `${name}-workgroup`,
  workGroupConfiguration: {
    resultConfiguration: {
      outputLocation: `s3://${athenaQueryResults.bucketName}`,
      encryptionConfiguration: {
        encryptionOption: 'SSE_KMS',
        kmsKey: kmsKey.keyArn,
      },
    },
  },
})

How to anylyze the data see also [here]({{ site.baseurl }}/aws/2021/08/27/aws_example_ddb_analytics/)

Cost Alert 💰

⚠️ Don’t forget to destroy after testing. Kinesis Data Streams has costs per hour

Code

https://github.com/JohannesKonings/test-aws-dynamodb-athena-cdk