AWS and other interesting stuff

Amazon Kinesis

Amazon Kinesis Firehose

Amazon Kinesis Firehose is a fully managed service for delivering real-time streaming data to destinations such as Amazon Simple Storage Service (Amazon S3), Amazon Redshift, or Amazon Elasticsearch Service (Amazon ES). With Firehose, you do not need to write applications or manage resources. You configure your data producers to send data to Firehose and it automatically delivers the data to the destination that you specified. You can also configure Firehose to transform your data before data delivery.

Amazon Kinesis Analytics

Amazon Kinesis Analytics enables you to create and run SQL queries on streaming data so that you can gain actionable insights and respond to your business and customer needs promptly.

Amazon Kinesis Streams

Amazon Kinesis Streams ingests a large amount of data in real time, durably stores the data, and makes the data available for consumption.

Concepts

  • Stream
    • A stream represents an ordered sequence of data records
  • Shard
    • A shard is a group of data records in a stream.
    • When you create a stream, you specify the number of shards for the stream.
    • Each shard can support up to 5 transactions per second for reads, up to a maximum total data read rate of 2 MB per second and up to 1,000 records per second for writes, up to a maximum total data write rate of 1 MB per second (including partition keys).
    • The total capacity of a stream is the sum of the capacities of its shards.
    • You can increase or decrease the number of shards in a stream as needed. However, note that you are charged on a per-shard basis.

You can calculate the initial number of shards (number_of_shards) that your stream will need by using the input values in the following formula:

number_of_shards = max(incoming_write_bandwidth_in_KB/1000, outgoing_read_bandwidth_in_KB/2000)

The number of partition keys should typically be much greater than the number of shards. This is because the partition key is used to determine how to map a data record to a particular shard. If you have enough partition keys, the data can be evenly distributed across the shards in a stream.

  • Producer
    • A producer puts data records into shards
    • For example, a web server sending log data to an Amazon Kinesis stream is a producer
    • To put data into the stream, you must specify the name of the stream, a partition key, and the data blob to be added to the stream. The partition key is used to determine which shard in the stream the data record is added to.
  • Consumer
    • A consumer gets data records from shards
    • Each consumer reads from a particular shard, using a shard iterator. A shard iterator represents the position in the stream from which the consumer will read.
    • You can use the Amazon Kinesis Client Library (KCL) to simplify parallel processing of the stream by a fleet of workers running on a fleet of EC2 instances. The KCL simplifies writing code to read from the shards in the stream and ensures that there is a worker allocated to every shard in the stream. The KCL also provides help with fault tolerance by providing checkpointing capabilities.

Properties

  • Retention Period:
    • Data records are accessible for a default of 24 hours from the time they are added to a stream.
    • Can be adjusted in hourly increments from 24 to 168 hours (1 to 7 days)

Use Cases

  • Fast log and data feed intake and processing
  • Real-time metrics and reporting
  • Real-time data analytics
    • e.g. click-tracking
  • Complex stream processing
    • e.g. merging one stream into another

Benefits:

  • Real-time aggregation of data
  • Loading the aggregate data into a data warehouse / map reduce cluster
  • Durability and Elasticity
  • Parallel application readers

Kinesis Streams vs Simple Queue Service (SQS)

Kinesis:

  • The same data records can be processed at the same time or within 24 hours by different consumers.
  • Data can be replayed within this window.
  • 24 hours of retention by default, up to 7 days in 1 hour increments.
  • You can put batches of 500 messages and pull batches of up to 10,000 messages.

SQS:

  • You can fanout to multiple queues, but then you can’t really reuse the information at a later time if you need to process it again.
  • 4 days of retention by default, minimum of 60 seconds and up to 14 days.
  • You can pull at most 10 messages and you can put at most 10 messages at a time.

Testing

Single Shard

Reference: http://docs.aws.amazon.com/streams/latest/dev/fundamental-stream.html

Create a stream:

$ aws kinesis create-stream --stream-name Foo --shard-count 1

The stream is initially in CREATING state, then it becomes ACTIVE

Put a record:

$ aws kinesis put-record --stream-name Foo --partition-key 123 --data testdata
{
    "ShardId": "shardId-000000000000",
    "SequenceNumber": "49569763848609267083528150493869570682713593090515402754"
}

Get a shard iterator:

I=$(aws kinesis get-shard-iterator --shard-id shardId-000000000000 --shard-iterator-type TRIM_HORIZON --stream-name Foo | grep ShardIterator | cut -d \" -f 4)

Get the record(s):

$ aws kinesis get-records --shard-iterator $I
{
    "Records": [
        {
            "Data": "dGVzdGRhdGE=",
            "PartitionKey": "123",
            "ApproximateArrivalTimestamp": 1485114494.304,
            "SequenceNumber": "49569763848609267083528150493869570682713593090515402754"
        }
    ],
    "NextShardIterator": "AAAAAAAAAAHCl0W+jhuZFZoiONvjc4Q0c9v8rt3SMtDYKRjz3DZJT6cCF/rv79nEngomDWqGLxTfpHHiH9VNbCuhDYBJfoGuO9F34XBxHnnETQDjx+BxOoAyw7rRydfIxAfi5X7fusIWH4oHC+L9aihceV6gjiyxoTCyJkVPbs3tF0a4pjuupHV66g+anfozh8X6doC+kDESLl+V/b5BjiyJ4fx6/qv0",
    "MillisBehindLatest": 0
}

Note:

  • The get-records request returns the NextShardIterator
  • Data is Base64 encoded
    • $ python -c "from base64 import b64decode; print b64decode('dGVzdGRhdGE=')"

Delete the stream:

$ aws kinesis delete-stream --stream-name Foo

The stream is initially in DELETING state

5 Shards

$ aws kinesis create-stream --stream-name Foo --shard-count 5

This creates 5 shards: shardId-000000000000 to shardId-000000000004

Put records to the same partition-key …

$ for i in $(seq 1 10); do aws kinesis put-record --stream-name Foo --partition-key 123 --data testdata-$i; done
{
    "ShardId": "shardId-000000000000",
    "SequenceNumber": "49569764140124608318720456199926556951384249175887577090"
}
etc...

… the records end up in the same shard

Put records to different partition-keys …

$ for i in $(seq 1 10); do aws kinesis put-record --stream-name Foo --partition-key 123-$i --data testdata-$i; done
{
    "ShardId": "shardId-000000000004",
    "SequenceNumber": "49569764210862572088459592804759380581902632576106889282"
}
{
    "ShardId": "shardId-000000000000",
    "SequenceNumber": "49569764210773369107665470312194446634631653759257673730"
}
{
    "ShardId": "shardId-000000000001",
    "SequenceNumber": "49569764210795669852864000935337191278723916818657837074"
}
{
    "ShardId": "shardId-000000000003",
    "SequenceNumber": "49569764210840271343261062181621471641088828308283457586"
}
{
    "ShardId": "shardId-000000000003",
    "SequenceNumber": "49569764210840271343261062181622680566908442937458163762"
}
{
    "ShardId": "shardId-000000000001",
    "SequenceNumber": "49569764210795669852864000935340818056182760912340385810"
}
{
    "ShardId": "shardId-000000000001",
    "SequenceNumber": "49569764210795669852864000935342026982002375610234568722"
}
{
    "ShardId": "shardId-000000000003",
    "SequenceNumber": "49569764210840271343261062181626307344367287031140712498"
}
{
    "ShardId": "shardId-000000000004",
    "SequenceNumber": "49569764210862572088459592804769051988459550090540875842"
}
{
    "ShardId": "shardId-000000000002",
    "SequenceNumber": "49569764210817970598062531558487189477733867996703621154"
}

… and they end up on different shards.

$ I=$(aws kinesis get-shard-iterator --shard-id shardId-000000000001 --shard-iterator-type TRIM_HORIZON --stream-name Foo | grep ShardIterator | cut -d \" -f 4)
$ aws kinesis get-records --shard-iterator $I | grep "Data" | cut -d \" -f 4

dGVzdGRhdGEtMw==
dGVzdGRhdGEtNg==
dGVzdGRhdGEtNw==
testdata-3
testdata-6
testdata-7