How SOCAR deals with big IoT information with Amazon MSK and Amazon ElastiCache for Redis

This is a visitor post co-written with SangSu Park and JaeHong Ahn from SOCAR.

As business continue to broaden their digital footprint, the significance of real-time information processing and analysis can not be overemphasized. The capability to rapidly determine and draw insights from information is important in today’s service landscape, where quick decision-making is crucial. With this ability, services can remain ahead of the curve and establish brand-new efforts that drive success.

This post is an extension of How SOCAR constructed a streaming information pipeline to process IoT information for real-time analytics and control In this post, we supply a comprehensive introduction of streaming messages with Amazon Managed Streaming for Apache Kafka (Amazon MSK) and Amazon ElastiCache for Redis, covering technical elements and style factors to consider that are vital for attaining ideal outcomes.

SOCAR is the prominent Korean movement business with strong competitiveness in car-sharing. SOCAR wished to develop and develop a service for a brand-new Fleet Management System (FMS). This system includes the collection, processing, storage, and analysis of Web of Things (IoT) streaming information from different lorry gadgets, along with historic functional information such as place, speed, fuel level, and element status.

This post shows a service for SOCAR’s production application that enables them to pack streaming information from Amazon MSK into ElastiCache for Redis, enhancing the speed and performance of their information processing pipeline. We likewise go over the crucial functions, factors to consider, and style of the service.


SOCAR runs about 20,000 vehicles and is preparing to consist of other big lorry types such as business cars and carrier trucks. SOCAR has actually released in-car gadgets that catch information utilizing AWS IoT Core This information was then kept in Amazon Relational Database Service (Amazon RDS). The difficulty with this technique consisted of ineffective efficiency and high resource use. For that reason, SOCAR searched for purpose-built databases customized to the requirements of their application and use patterns while satisfying the future requirements of SOCAR’s service and technical requirements. The crucial requirements for SOCAR consisted of attaining optimal efficiency for real-time information analytics, which needed keeping information in an in-memory information shop.

After mindful factor to consider, ElastiCache for Redis was chosen as the ideal service due to its capability to manage intricate information aggregation guidelines with ease. Among the difficulties dealt with was filling information from Amazon MSK into the database, due to the fact that there was no integrated Kafka adapter and customer readily available for this job. This post concentrates on the advancement of a Kafka customer application that was developed to tackle this difficulty by allowing performant information filling from Amazon MSK to Redis.

Option introduction

Drawing out important insights from streaming information can be a difficulty for services with varied usage cases and work. That’s why SOCAR constructed a service to flawlessly bring information from Amazon MSK into several purpose-built databases, while likewise empowering users to change information as required. With completely handled Apache Kafka, Amazon MSK supplies a dependable and effective platform for consuming and processing real-time information.

The following figure reveals an example of the information circulation at SOCAR.

solution overview

This architecture includes 3 elements:

  • Streaming information— Amazon MSK functions as a scalable and reputable platform for streaming information, efficient in getting and keeping messages from a range of sources, consisting of AWS IoT Core, with messages arranged into several subjects and partitions
  • Customer application— With a customer application, users can flawlessly bring information from Amazon MSK into a target database or information storage while likewise specifying information improvement guidelines as required
  • Target databases— With the customer application, the SOCAR group had the ability to pack information from Amazon MSK into 2 different databases, each serving a particular work

Although this post concentrates on a particular usage case with ElastiCache for Redis as the target database and a single subject called gps, the customer application we explain can manage extra subjects and messages, along with various streaming sources and target databases such as Amazon DynamoDB Our post covers the most essential elements of the customer application, including its functions and elements, style factors to consider, and a comprehensive guide to the code execution.

Parts of the customer application

The customer application consists of 3 primary parts that interact to take in, change, and load messages from Amazon MSK into a target database. The following diagram reveals an example of information improvements in the handler element.


The information of each element are as follows:

  • Customer— This takes in messages from Amazon MSK and after that forwards the messages to a downstream handler.
  • Loader— This is where users define a target database. For instance, SOCAR’s target databases consist of ElastiCache for Redis and DynamoDB.
  • Handler— This is where users can use information improvement guidelines to the inbound messages prior to filling them into a target database.

Functions of the customer application

This connection has 3 functions:

  • Scalability— This service is developed to be scalable, guaranteeing that the customer application can manage an increasing volume of information and accommodate extra applications in the future. For example, SOCAR looked for to establish a service efficient in dealing with not just the present information from roughly 20,000 cars however likewise a bigger volume of messages as business and information continue to proliferate.
  • Efficiency— With this customer application, users can accomplish constant efficiency, even as the volume of source messages and target databases boosts. The application supports multithreading, permitting concurrent information processing, and can manage unanticipated spikes in information volume by quickly increasing calculate resources.
  • Versatility— This customer application can be recycled for any brand-new subjects without needing to develop the whole customer application once again. The customer application can be utilized to consume brand-new messages with various setup worths in the handler. SOCAR released several handlers to consume several messages. Likewise, this customer application enables users to include extra target areas. For instance, SOCAR at first established a service for ElastiCache for Redis and after that reproduced the customer application for DynamoDB.

Style factors to consider of the customer application

Keep in mind the following style factors to consider for the customer application:

  • Scale out— An essential style concept of this service is scalability. To accomplish this, the customer application keeps up Amazon Elastic Kubernetes Service (Amazon EKS) due to the fact that it can permit users to increase and reproduce customer applications quickly.
  • Usage patterns— To get, shop, and take in information effectively, it is necessary to develop Kafka subjects depending upon messages and usage patterns. Depending upon messages taken in at the end, messages can be gotten into several subjects of various schemas. For instance, SOCAR has several subjects that are taken in by various work.
  • Purpose-built database— The customer application supports filling information into several target choices based upon the particular usage case. For instance, SOCAR kept real-time IoT information in ElastiCache for Redis to power real-time control panel and web applications, while keeping current journey info in DynamoDB that didn’t need real-time processing.

Walkthrough introduction

The manufacturer of this service is AWS IoT Core, which sends messages into a subject called gps. The target database of this service is ElastiCache for Redis. ElastiCache for Redis a quick in-memory information save that supplies sub-millisecond latency to power internet-scale, real-time applications. Developed on open-source Redis and suitable with the Redis APIs, ElastiCache for Redis integrates the speed, simpleness, and adaptability of open-source Redis with the manageability, security, and scalability from Amazon to power the most requiring real-time applications.

The target place can be either another database or storage depending upon the usage case and work. SOCAR utilizes Amazon EKS to run the containerized service to accomplish scalability, efficiency, and versatility. Amazon EKS is a handled Kubernetes service to run Kubernetes in the AWS Cloud. Amazon EKS instantly handles the schedule and scalability of the Kubernetes manage airplane nodes accountable for scheduling containers, handling application schedule, keeping cluster information, and other crucial jobs.

For the programs language, the SOCAR group chose to utilize the Go Programs language, making use of both the AWS SDK for Go and a Goroutine, a light-weight sensible or virtual thread handled by the Go runtime, that makes it simple to handle several threads. The AWS SDK for Go simplifies making use of AWS services by offering a set of libraries that correspond and familiar for Go designers.

In the following areas, we stroll through the actions to carry out the service:

  1. Develop a customer.
  2. Develop a loader.
  3. Develop a handler.
  4. Develop a customer application with the customer, loader, and handler.
  5. Release the customer application.


For this walkthrough, you need to have the following:

Develop a customer

In this example, we utilize a subject called gps, and the customer consists of a Kafka customer that gets messages from the subject. SOCAR produced a struct and constructed a customer (called NewConsumer in the code) to make it extendable. With this technique, any extra criteria and guidelines can be included quickly.

To validate with Amazon MSK, SOCAR utilizes IAM. Since SOCAR currently utilizes IAM to validate other resources, such as Amazon EKS, it utilizes the very same IAM function ( aws_msk_iam_v2) to validate customers for both Amazon MSK and Apache Kafka actions.

The following code produces the customer:

 type Customer struct {
logger * zerolog.Logger.
kafkaReader * kafka.Reader.

func NewConsumer( logger * zerolog.Logger, awsCfg aws.Config, brokers [] string, consumerGroupID, subject string) * Customer {
return && Customer {
logger: logger,.
kafkaReader: kafka.NewReader( kafka.ReaderConfig {
Dialer: && kafka.Dialer {
TLS: && tls.Config {MinVersion: tls.VersionTLS12},.
Timeout: 10 * time.Second,.
DualStack: real,.
SASLMechanism: aws_msk_iam_v2. NewMechanism( awsCfg),.
Brokers: brokers,//.
GroupID: consumerGroupID,//.
Subject: subject,//.
StartOffset: kafka.LastOffset,//.
} ),.

func (customer * Customer) Close() mistake {
var err mistake = nil.
if consumer.kafkaReader!= nil {
err = consumer.kafkaReader.Close().
consumer.logger.Info(). Msg(" closed kafka reader").
return err.

func (customer * Customer) Consume( ctx context.Context) (kafka.message, mistake) {
return consumer.kafkaReader.Readmessage( ctx).

Develop a loader

The loader function, represented by the Loader struct, is accountable for filling messages to the target place, which in this case is ElastiCache for Redis. The NewLoader function initializes a brand-new circumstances of the Loader struct with a logger and a Redis cluster customer, which is utilized to interact with the ElastiCache cluster. The redis.NewClusterClient things is initialized utilizing the NewRedisClient function, which utilizes IAM to validate the customer for Redis actions. This guarantees protected and authorized access to the ElastiCache cluster. The Loader struct likewise consists of the Close technique to close the Kafka reader and maximize resources.

The following code produces a loader:

 type Loader struct {
logger * zerolog.Logger.
redisClient * redis.ClusterClient.

func NewLoader( logger * zerolog.Logger, redisClient * redis.ClusterClient) * Loader {
return && Loader {
logger: logger,.
redisClient: redisClient,.

func (customer * Customer) Close() mistake {
var err mistake = nil.
if consumer.kafkaReader!= nil {
err = consumer.kafkaReader.Close().
consumer.logger.Info(). Msg(" closed kafka reader").
return err.

func (customer * Customer) Consume( ctx context.Context) (kafka.Message, mistake) {
return consumer.kafkaReader.ReadMessage( ctx).

func NewRedisClient( ctx context.Context, awsCfg aws.Config, addrs [] string, replicationGroupID, username string) (* redis.ClusterClient, mistake) {
redisClient:= redis.NewClusterClient(&& redis.ClusterOptions {NewClient: func( decide *
redis.Options) * redis.Client {return redis.NewClient( & redis.Options {Addr: opt.Addr,.
CredentialsProvider: func()( username string, password string) {token, err:= BuildRedisIAMAuthToken( ctx, awsCfg, replicationGroupID, opt.Username).
if err!= nil {
panic( err).
return opt.Username, token.
PoolSize: opt.PoolSize,.
PoolTimeout: opt.PoolTimeout,.
TLSConfig: && tls.Config {InsecureSkipVerify: real},.
} ).
Addrs: addrs,.
Username: username,.
PoolSize: 100,.
PoolTimeout: 1 * time.Minute,.
} ).
pong, err:= redisClient.Ping( ctx). Outcome().
if err!= nil {
return nil, err.
if pong!= "PONG" {
return nil, fmt.Errorf(" stopped working to validate connection to redis server").
return redisClient, nil.

Develop a handler

A handler is utilized to consist of service guidelines and information improvement reasoning that prepares information prior to filling it into the target place. It functions as a bridge in between a customer and a loader. In this example, the subject name is cars.gps.json, and the message consists of 2 secrets, lng and lat, with information type Float64. Business reasoning can be specified in a function like handlerFuncGpsToRedis and after that used as follows:

 type (.
handlerFunc func( ctx context.Context, loader * Loader, crucial, worth [] byte) mistake.
handlerFuncMap map[string] handlerFunc.

var HandlerRedis = handlerFuncMap {
" cars.gps.json": handlerFuncGpsToRedis.

func GetHandlerFunc( funcMap handlerFuncMap, subject string) (handlerFunc, mistake) {
handlerFunc, exist:= funcMap[topic]
if! exist {
return nil, fmt.Errorf(" stopped working to discover handler func for '% s'", subject).
return handlerFunc, nil.

func handlerFuncGpsToRedis( ctx context.Context, loader * Loader, crucial, worth [] byte) mistake {
// unmarshal raw information to map.
information:= map[string] user interface {} {}
err:= json.Unmarshal( worth, && information)
. if err!= nil {
return err.

// prepare things to pack on redis as geolocation.
name:= string( secret).
lng, err:= getFloat64ValueFromMap( information, "lng").
if err!= nil {
return err.
lat, err:= getFloat64ValueFromMap( information, "lat").
if err!= nil {
return err.

// include geolocation to redis.
return loader.RedisGeoAdd( ctx, "vehicles #gps", name, lng, lat).

Develop a customer application with the customer, loader, and handler

Now you have actually produced the customer, loader, and handler. The next action is to develop a customer application utilizing them. In a customer application, you check out messages from your stream with a customer, change them utilizing a handler, and after that load changed messages into a target place with a loader. These 3 elements are parameterized in a customer application function such as the one displayed in the following code:

 type Port struct {
ctx context.Context.
logger * zerolog.Logger.

customer * Customer.
handler handlerFuncMap.
loader * Loader.

func NewConnector( ctx context.Context, logger * zerolog.Logger, customer * Customer, handler handlerFuncMap, loader * Loader) * Port {
return && Port {
ctx: ctx,.
logger: logger,.

customer: customer,.
handler: handler,.
loader: loader,.

func (adapter * Port) Close() mistake {
var err mistake = nil.
if connector.consumer!= nil {
err = connector.consumer.Close().
if connector.loader!= nil {
err = connector.loader.Close().
return err.

func (adapter * Port) Run() mistake {
wg:= sync.WaitGroup {}
postpone wg.Wait().
handlerFunc, err:= GetHandlerFunc( connector.handler, connector.consumer.kafkaReader.Config(). Subject).
if err!= nil {
return err.
for {
msg, err:= connector.consumer.Consume( connector.ctx).
if err!= nil {
if errors.Is( context.Canceled, err) {

wg.Add( 1 ).
go func( secret, worth [] byte) {
postpone wg.Done().
err = handlerFunc( connector.ctx, connector.loader, secret, worth).
if err!= nil {
connector.logger.Err( err). Msg("").
} (msg.Key, msg.Value).
return nil.

Release the customer application

To accomplish optimum parallelism, SOCAR containerizes the customer application and releases it into several pods on Amazon EKS. Each customer application consists of a distinct customer, loader, and handler. For instance, if you require to get messages from a single subject with 5 partitions, you can release 5 similar customer applications, each running in its own pod. Likewise, if you have 2 subjects with 3 partitions each, you need to release 2 customer applications, leading to an overall of 6 pods. It’s a finest practice to run one customer application per subject, and the variety of pods need to match the variety of partitions to allow concurrent message processing. The pod number can be defined in the Kubernetes implementation setup

There are 2 phases in the Dockerfile. The very first phase is the contractor, which sets up develop tools and reliances, and constructs the application. The 2nd phase is the runner, which utilizes a smaller sized base image (Alpine) and copies just the essential files from the contractor phase. It likewise sets the suitable user authorizations and runs the application. It’s likewise worth keeping in mind that the contractor phase utilizes a particular variation of the Golang image, while the runner phase utilizes a particular variation of the Alpine image, both of which are thought about to be light-weight and protected images.

The following code is an example of the Dockerfile:

 # contractor.
FROM golang:1.18.2- alpine3.16 AS contractor.
RUN apk include build-base.
WORKDIR/ usr/src/app.
COPY go.mod go.sum./.
RUN go mod download.
RUN go develop -o adapter.

# runner.
FROM alpine:3.16.0 AS runner.
WORKDIR/ usr/bin/app.
RUN apk include-- no-cache tzdata.
RUN addgroup-- system app && & & adduser-- system-- shell/ bin/false-- ingroup app app.
COPY-- from= contractor/ usr/src/app/ adapter.
RUN chown -R app: app/ usr/bin/app.
USER app.
ENTRYPOINT ["/usr/bin/app/connector"]


In this post, we talked about SOCAR’s technique to developing a customer application that allows IoT real-time streaming from Amazon MSK to target areas such as ElastiCache for Redis. We hope you discovered this post useful and beneficial. Thank you for checking out!

About the Authors

SangSu Park is the Head of Operation Group at SOCAR. His enthusiasm is to keep knowing, accept difficulties, and pursue shared development through interaction. He likes to take a trip looking for brand-new cities and locations.

jaehong JaeHong Ahn is a DevOps Engineer in SOCAR’s cloud facilities group. He is devoted to promoting partnership in between designers and operators. He delights in developing DevOps tools and is dedicated to utilizing his coding capabilities to assist develop a much better world. He likes to prepare scrumptious meals as a personal chef for his partner.

bdb-2857-younggu Younggu Yun operates at AWS Data Laboratory in Korea. His function includes assisting consumers throughout the APAC area fulfill their service goals and get rid of technical difficulties by offering authoritative architectural assistance, sharing finest practices, and structure ingenious services together.

Like this post? Please share to your friends:
Leave a Reply

;-) :| :x :twisted: :smile: :shock: :sad: :roll: :razz: :oops: :o :mrgreen: :lol: :idea: :grin: :evil: :cry: :cool: :arrow: :???: :?: :!: