Back to blog
Oct 26, 2021
4 min read

Example how to analyze DynamoDB item changes with Kinesis and Athena created with CDK

This post is how stream data changes of a DynamoDb table via Kinesis Data Stream and Kinesis Firehose to S3, and analyze the data with Athena. Build with CDK.

This is the same like described [here]({{ site.baseurl }}/aws/2021/08/27/aws_example_ddb_analytics/), but instead of terraform it’s build with CDK.

To bootstrap the project run this command: cdk init app --language typescript Further information are here

All the services are in this file.

Updates

2022-09-11: Add prefix for kinesis data firehose S3 destination 2022-08-13: CDK migrated to v2

KMS key

This creates are KMS key with an alias to encrypt the data in the created services.

const kmsKey = new kms.Key(this, "kmsKey", {
  enableKeyRotation: true,
});
 
kmsKey.addAlias(name);

DynamoDb and Kinesis Data Stream

This is the creation of the DynamoDb with the Kinesis Data Stream.

const stream = new kinesis.Stream(this, "Stream", {
  streamName: `${name}-data-stream`,
  encryption: kinesis.StreamEncryption.KMS,
  encryptionKey: kmsKey,
});
 
const table = new dynamodb.Table(this, "Table", {
  tableName: name,
  partitionKey: { name: "pk", type: dynamodb.AttributeType.STRING },
  billingMode: dynamodb.BillingMode.PAY_PER_REQUEST,
  encryption: dynamodb.TableEncryption.CUSTOMER_MANAGED,
  encryptionKey: kmsKey,
  kinesisStream: stream,
});

That adds to the DynamoDb, a Kinesis Data Stream, and connects it to the DynamoDb.

![kinesis data stream]({{ site.baseurl }}/img/2021-10-26-aws_example_ddb_analytics_cdk/kinesis_data_stream.png)

![kinesis data stream ddb]({{ site.baseurl }}/img/2021-10-26-aws_example_ddb_analytics_cdk/kinesis_data_stream_ddb.png)

Kinesis Data Firehose and S3 Bucket

Kinesis Data Firehose is the connection between the Kinesis Data Stream to the S3 Bucket.

Unfortunately, Firehose stores the JSONs without a linefeed. Therefore it’s a lambda for conversion is necessary.

More about that is described in this post

To have the kinesis firehose data isolated under a “namespace” we use a prefix.

It looks like this.

const firehoseBucket = new s3.Bucket(this, "firehose-s3-bucket", {
  bucketName: `${name}-firehose-s3-bucket`,
  encryptionKey: kmsKey,
});
 
const processor = new lambda.NodejsFunction(this, "lambda-function-processor", {
  functionName: `${name}-firehose-converter`,
  timeout: cdk.Duration.minutes(2),
  bundling: {
    sourceMap: true,
  },
});
 
const lambdaProcessor = new LambdaFunctionProcessor(processor, {
  retries: 5,
});
 
const ddbChangesPrefix = "ddb-changes";
 
const s3Destination = new destinations.S3Bucket(firehoseBucket, {
  encryptionKey: kmsKey,
  bufferingInterval: cdk.Duration.seconds(60),
  processor: lambdaProcessor,
  dataOutputPrefix: `${ddbChangesPrefix}/`,
});
 
const firehoseDeliveryStream = new firehose.DeliveryStream(
  this,
  "Delivery Stream",
  {
    deliveryStreamName: `${name}-firehose`,
    sourceStream: stream,
    destinations: [s3Destination],
  },
);

The delivery of the data to the S3 bucket is buffered. Here are the default values.

![firehose-buffer]({{ site.baseurl }}/img/2021-10-26-aws_example_ddb_analytics_cdk/firehose_buffer.png)

Glue crawler

Athena needs a structured table for the SQL queries. The Glue crawler creates this from the data in the S3 bucket below the configured prefix.

The glue crawler isn’t a L2 construct yet. So it needs a L1 construct. See here more about L1 - L3.

There is already a github issue to create a L2 construct for the glue crawler.

const getSecurityConfiguration = new iam.PolicyDocument({
  statements: [
    new iam.PolicyStatement({
      actions: ["glue:GetSecurityConfiguration"],
      resources: ["*"],
    }),
  ],
});
 
const roleCrawler = new iam.Role(this, "role crawler", {
  roleName: `${name}-crawler-role`,
  assumedBy: new iam.ServicePrincipal("glue.amazonaws.com"),
  inlinePolicies: {
    GetSecurityConfiguration: getSecurityConfiguration,
  },
});
 
const glueDb = new glue.Database(this, "glue db", {
  databaseName: `${name}-db`,
});
 
const glueSecurityOptions = new glue.SecurityConfiguration(
  this,
  "glue security options",
  {
    securityConfigurationName: `${name}-security-options`,
    s3Encryption: {
      mode: glue.S3EncryptionMode.KMS,
    },
  },
);
 
const crawler = new glue.CfnCrawler(this, "crawler", {
  name: `${name}-crawler`,
  role: roleCrawler.roleArn,
  targets: {
    s3Targets: [
      {
        path: `s3://${firehoseBucket.bucketName}/${ddbChangesPrefix}`,
      },
    ],
  },
  databaseName: glueDb.databaseName,
  crawlerSecurityConfiguration: glueSecurityOptions.securityConfigurationName,
});
 
// const glueCrawlerLogArn = `arn:aws:logs:${cdk.Stack.of(this).region}:${cdk.Stack.of(this).account}:log-group:/aws-glue/crawlers:log-stream:${crawler.name}`
const glueCrawlerLogArn = `arn:aws:logs:${cdk.Stack.of(this).region}:${cdk.Stack.of(this).account}:log-group:/aws-glue/crawlers*`; //:log-stream:${crawler.name}`
 
const glueTableArn = `arn:aws:glue:${cdk.Stack.of(this).region}:${cdk.Stack.of(this).account}:table/${glueDb.databaseName}/*`;
 
const glueCrawlerArn = `arn:aws:glue:${cdk.Stack.of(this).region}:${cdk.Stack.of(this).account}:crawler/${crawler.name}`;
 
roleCrawler.addToPolicy(
  new iam.PolicyStatement({
    resources: [
      glueCrawlerLogArn,
      glueTableArn,
      glueDb.catalogArn,
      glueDb.databaseArn,
      kmsKey.keyArn,
      firehoseBucket.bucketArn,
      `${firehoseBucket.bucketArn}/*`,
      glueCrawlerArn,
    ],
    actions: ["logs:*", "glue:*", "kms:*", "S3:*"],
  }),
);

For test purposes, it’s enough to run the crawler before any analysis. Scheduling is also possible.

![glue-run-crawler]({{ site.baseurl }}/img/2021-10-26-aws_example_ddb_analytics_cdk/glue_run_crawler.png)

That creates this table, which is accessible by Athena.

![glue-table]({{ site.baseurl }}/img/2021-10-26-aws_example_ddb_analytics_cdk/glue_table.png)

Athena

For Athena it needs an S3 bucket for the query results and, for better isolation to other projects, a workgroup.

const athenaQueryResults = new s3.Bucket(this, "query-results", {
  bucketName: `${name}-query-results`,
  encryptionKey: kmsKey,
});
 
new athena.CfnWorkGroup(this, "analytics-athena-workgroup", {
  name: `${name}-workgroup`,
  workGroupConfiguration: {
    resultConfiguration: {
      outputLocation: `s3://${athenaQueryResults.bucketName}`,
      encryptionConfiguration: {
        encryptionOption: "SSE_KMS",
        kmsKey: kmsKey.keyArn,
      },
    },
  },
});

How to anylyze the data see also [here]({{ site.baseurl }}/aws/2021/08/27/aws_example_ddb_analytics/)

Cost Alert 💰

⚠️ Don’t forget to destroy after testing. Kinesis Data Streams has costs per hour

Code

https://github.com/JohannesKonings/test-aws-dynamodb-athena-cdk