Introduction to Apache Kafka

Apahe Kafka

What is Apache Kafka?

Apache Kafka is a distributed streaming system with publish and subscribe the stream of records. In another aspect it is an enterprise messaging system. It is highly fast, horizontally scalable and fault tolerant system. Kafka has four core APIs called,

Producer API: 

This API allows the clients to connect to Kafka servers running in cluster and publish the stream of records to one or more Kafka topics .

Consumer API:

This API allows the clients to connect to Kafka servers running in cluster and consume the streams of records from one or more Kafka topics. Kafka consumers PULLS the messages from Kafka topics.

Streams API:

This API allows the clients to act as stream processors by consuming streams from one or more topics and producing the streams to other output topics. This allows to transform the input and output streams.

Connector API:

This API allows to write reusable producer and consumer code. For example, if we want to read data from any RDBMS to publish the data to topic and consume data from topic and write that to RDBMS. With connector API we can create reusable source and sink connector components for various data sources.

What use cases Kafka used for?

Kafka is used for the below use cases,

Messaging System:

Kafka used as an enterprise messaging system to decouple the source and target systems to exchange the data. Kafka provides high throughput with partitions and fault tolerance with replication compared to JMS.


Apache Kafka Messaging System

Web Activity Tracking:

To track the user journey events on the website for analytics and offline data processing.

Log Aggregation:

To process the log from various systems. Especially in the distributed environments, with micro services architectures where the systems are deployed on various hosts. We need to aggregate the logs from various systems and make the logs available in a central place for analysis. Go through the article on distributed logging architecture where Kafka is used https://smarttechie.org/2017/07/31/distributed-logging-architecture-for-micro-services/

Metrics Collector:

Kafka is used to collect the metrics from various systems and networks for operations monitoring. There are Kafka metrics reporters available for monitoring tools like Ganglia, Graphite etc…

Some references on this https://github.com/stealthly/metrics-kafka

What is broker?

An instance in a Kafka cluster is called as broker. In a Kafka cluster if you connect to any one broker you will be able to access entire cluster. The broker instance which we connect to access cluster is also known as bootstrap server. Each broker is identified by a numeric id in the cluster. To start with Kafka cluster three brokers is a good number. But there are clusters which has hundreds of brokers in it.

What is Topic?

A topic is a logical name to which the records are published. Internally the topic is divided into partitions to which the data is published. These partitions are distributed across the brokers in cluster. For example if a topic has three partitions with 3 brokers in cluster each broker has one partition. The published data to partition is append only with the offset increment.

Topic Partitions

Below are the couple of points we need to remember while working with partitions.

  • Topics are identified by its name. We can have many topics in a cluster.
  • The order of the messages is maintained at the partition level, not across topic.
  • Once the data written to partition is not overridden. This is called immutability.
  • The message in partitions are stored with key, value and timestamp. Kafka ensures to publish the message to same partition for a given key.
  • From the Kafka cluster, each partition will have a leader which will take read/write operations to that partition.

Apache Kafka Partitions

In the above example, I have created a topic with three partitions with replication factor 3. In this case as the cluster is having 3 brokers, the three partitions are evenly distributed and the replicas of each partition is replicated over to another 2 brokers. As the replication factor is 3, there is no data loss even 2 brokers goes down. Always keep replication factor is greater than 1 and less than or equal to number of brokers in the cluster. You can not create topic with replication factor more then the number of brokers in a cluster.

In the above diagram, for each partition there is a leader(glowing partition) and other in-sync replicas(gray out partitions) are followers. For partition 0, the broker-1 is leader and broker-2, broker-3 are followers. All the reads/writes to partition 0 will go to broker-1 and the same will be copied to broker-2 and broker-3.

Now let us create Kafka cluster with 3 brokers by following the below steps.

Step 1:

Download the Apache Kafka latest version. In this example I am using 1.0 which is latest. Extract the folder and move into the bin folder. Start the Zookeeper which is essential to start with Kafka cluster. Zookeeper is the coordination service to manage the brokers, leader election for partitions and alerting the Kafka during the changes to topic ( delete topic, create topic etc…) or brokers( add broker, broker dies etc …). In this example I have started only one Zookeeper instance. In production environments we should have more Zookeeper instances to manage fail-over. With out Zookeeper Kafka cluster cannot work.

Step 2:

Now start Kafka brokers. In this example we are going to start three brokers. Goto the config folder under Kafka root and copy the server.properties file 3 times and name it as server_1.properties, server_2.properties and server_3.properties. Change the below properties in those files.

Now run the 3 brokers with the below commands.

Step 3:

Create topic with below command.

Step 4:

Produce some messages to the topic created in above step by using Kafka console producer. For console producer mention any one of the broker address. That will be the bootstrap server to gain access to the entire cluster.

Step 5:

Consume the messages using Kafka console consumer. For Kafka consumer mention any one of the broker address as bootstrap server. Remember while reading the messages you may not see the order. As the order is maintained at the partition level, not at the topic level.

If you want you can describe the topic to see how partitions are distributed and the the leader’s of each partition using below command.

In the above description, broker-1 is the leader for partition:0 and broker-1, broker-2 and broker-3 has replicas of each partition.

In the next article we will see producer and consumer JAVA API. Till then, Happy Messaging!!!

Advertisements
Tagged with:
Posted in Apache Kafka

Enterprise Application Monitoring in production with OverOps

OverOps                                     In this article we will discuss OverOps which will monitor application and provides insights about the exceptions with code and the variable state which causes the exception. In most of the traditional logging which we do with Splunk or ELK or any other log aggregation tool we capture the exception stack trace to troubleshoot the issue. But with exception stack trace alone, finding out the root cause behind and fixing it is hard and time consuming. If you attach OverOps agent to the application along with exception, it will provide the source code where exactly the exception happened and the variable state and JVM state during that time. OverOps supports the below platforms.

  • Java
  • Scala
  • Closure
  • .Net

It also provides integration with existing log and performance monitoring tools like Splunk, ELK, NewRelic, AppDynamics etc…

In this article I will show you how to configure OverOps for Java standalone application. You can get the trail version of OverOps agent by registering based on the operating system. We can go either with on premise or SaaS based solution. To demonstrate this I have created sample Spring Boot application and the jar is launched with OverOps agent to monitor the exceptions thrown  based on certain business rule. But in the enterprise application the business logic will be critical and the run time exceptions will be raised unpredictably.


java -agentlib:TakipiAgent -jar Sample_Over_Ops-0.0.1-SNAPSHOT.jar

With the above application when I accessed the REST end point which generated exceptions and the same is captured in OverOps dashboard as shown below.

OverOps Dashboard

The sample application is available here.

Happy Monitoring!!

Tagged with: ,
Posted in DevOps

Distributed Logging Architecture for Microservices

Micro Services

               In this article we will see what are the best practices we need to follow while logging micro services and the architecture to handle distributed logging in micro services world. As we all know micro services runs on multiple hosts. To fulfill a single business requirement, we might need to talk to multiple services running on different machines. So, the log messages generated by the micro services are distributed across multiple hosts. As a developer or administrator, if you want to troubleshoot any issue you are clueless. Because you don’t know micro service running on which host served your request. Even if you know which hosts served your request, going to different hosts and grepping the logs and correlating them across all the micro services requests  is a cumbersome process. If your environment is auto scaled, then troubleshooting an issue is unimaginable. Here are some practices which will make our life easy to troubleshoot the issue in the micro services world.

  • Centralize and externalize storage of your logs

    As the micro services are running on multiple hosts, if you send all the logs generated across the hosts to an external centralized place. From there you can easily get the log information from one place. It might be another physical system which is highly available or S3 bucket or another storage. If you are hosting your environment on AWS  you can very well leverage CloudWatch or any other cloud provider then you can find appropriate service.

  • Log structured data

    Generally we put the log messages which will be raw text output in log files. There are different log encoders available which will emit the JSON log messages. Add all the necessary fields to log. Hence we will have right data available in the logs to troubleshoot any issue. Below are some of the useful links to configure JSON appenders.

         https://logging.apache.org/log4j/2.x/manual/layouts.html

         https://github.com/qos-ch/logback-contrib/wiki/JSON

 If you are using Logstash as the log aggregation tool, then there are encoders which you can configure to output the JSON  log messages .
https://github.com/logstash/logstash-logback-encoder

https://github.com/logstash/log4j-jsonevent-layout

  • Generate correlation Id and pass the same correlation Id to the downstream     services and  return the correlation Id as part of response

 Generate a correlation Id when we are making the first micro service call and pass the  same correlation id  to the down stream services. Log the correlation Id across all the micro service calls. Hence we can use the  correlation Id coming from the response to trace out the logs. 

If you are using Spring Cloud to develop micro services you can use Spring Sleuth module along with Zipkin

  • Allow to change the logging level dynamically and use Asynchronous logging

We will be using different log levels in the code and have enough logging statements in the code. We should  have liberty to change the log level dynamically, then it is very helpful to enable the appropriate log level. This  way we no need to enable the least logging level to print all the logs during server startup and avoids the overhead of  excessive  logging. Add  asynchronous log appenders. So that the logger thread will not be blocked the  request thread. If you are using Spring Cloud, then use Spring Boot admin to achieve the log level change dynamically..

  • Make logs are searchable

Make all the fields available in the logs are searchable. For example, If you get hold of correlation Id you can  search all the logs based on the correlation Id to find out the request flow.

                Now we will see the architecture of log management in micro services world. This solution uses ELK stack. Generally we will have different log configurations for different environments. For development environment we will go with console appenders or file appenders which will output the logs in the local host. This is easy and convenient during development. For other environments we will send the logs to centralized place. The architecture which we are going to discuss is for QA and higher environments.

Distributed Logging Architecture

          In the above architecture we configured Kafka log appender to  output the log messages to Kafka cluster. From the Kafka cluster the message will be ingested to Logstash. While ingesting the log messages to Logstash we can transform the information as we required. The output of Logstash will be stashed to Elastic search. Using Kibana visualization tool we can search the indexed logs with the parameters we logged. Remember we can use Rabbit MQ/Active MQ etc.. message brokers instead of Kafka. Below are some of the useful links on appenders.

https://github.com/danielwegener/logback-kafka-appender

http://docs.spring.io/spring-amqp/api/org/springframework/amqp/rabbit/logback/AmqpAppender.html

https://logging.apache.org/log4j/2.0/manual/appenders.html#KafkaAppender

https://logging.apache.org/log4j/2.0/manual/appenders.html#JMSAppender

In the second option given below, we will write the log messages using Logstash appender to the file on the host machines. The Filebeat agent will watch the log files and ingests the log information to the Logstash cluster.Distributed Logging Architecture

Among the first and second options, my choice goes to first option. Below are my justifications.

  • If the system is highly scalable with auto scaling feature the instances will be created and destroyed based on the need. In that case if you go with second option, there might be loss of log files if the host is destroyed. But with first option as and when we log, the message will come to middleware. It is perfect suit for auto scaling environments.
  • With second option we are installing Filebeat or similar file watchers on the host machine. For some reason if those agents stops working we may not get the logs from that hosts. Again we are losing the log information.

In the coming articles we will discuss some more articles on micro services. Till then stay tuned!!!

Tagged with: , ,
Posted in Microservices

Spring Boot Admin – Admin UI for administration of spring boot applications

               As part of micro services development many of us are using Spring Boot along with Spring Cloud features. In micro services world we will have many Spring Boot applications which will be running on same/different hosts. If we add Spring Actuator to the Spring Boot applications, we will get a lot of out of the box end points to monitor and interact with Spring Boot applications. The list is given below.

ID Description Sensitive Default
actuator Provides a hypermedia-based “discovery page” for the other endpoints. Requires Spring HATEOAS to be on the classpath. true
auditevents Exposes audit events information for the current application. true
autoconfig Displays an auto-configuration report showing all auto-configuration candidates and the reason why they ‘were’ or ‘were not’ applied. true
beans Displays a complete list of all the Spring beans in your application. true
configprops Displays a collated list of all @ConfigurationProperties. true
dump Performs a thread dump. true
env Exposes properties from Spring’s ConfigurableEnvironment. true
flyway Shows any Flyway database migrations that have been applied. true
health Shows application health information (when the application is secure, a simple ‘status’ when accessed over an unauthenticated connection or full message details when authenticated). false
info Displays arbitrary application info. false
loggers Shows and modifies the configuration of loggers in the application. true
liquibase Shows any Liquibase database migrations that have been applied. true
metrics Shows ‘metrics’ information for the current application. true
mappings Displays a collated list of all @RequestMapping paths. true
shutdown Allows the application to be gracefully shutdown (not enabled by default). true
trace Displays trace information (by default the last 100 HTTP requests). true

The above end points provides a lot of insights about Spring Boot application. But If you have many applications running then monitoring each application by hitting the end points and inspecting the JSON response is tedious process. To avoid this hassle Code Centric team came up with Spring Boot Admin module which will provide us Admin UI Dash board to administer  Spring Boot applications. This module crunches the data from Actuator end points and provides insights about all the registered applications in single dash-board. Now we will demonstrate the Spring Boot Admin features in the following sections.

As a first step, create a Spring Boot application which we will  make  as Spring Boot Admin server module by adding the below maven dependencies.

Add Spring Boot Admin Server configuration via adding @EnableAdminServer to your configuration.

Let us create more Spring Boot applications to monitor via Spring Boot Admin server created in above steps. All the Spring Boot applications which will create now will be acted as Spring Boot Admin clients. To make application as Admin client, add the below dependency along with actuator dependency. In this demo I have created three applications like Eureka Server, Customer Service and Order Service.

Add below property to application.properties file. This property tells that where the Spring Boot Admin server is running. Hence the clients will register with server.

Now If we start the Admin Server and other Spring Boot applications we can able to see all the admin clients information in the Admin server dashboard. As we started our admin server on 1111 port in this example we can see dash-board at http ://<host_name>:1111. Below is the screenshot of the Admin Server UI.

Detailed view of an application is given below. In this view we can see the tail of the log file, metrics, environment variables, log configuration where we can dynamically switch the log levels at the component level, root level or package level and other information.

Now we will see another feature called notifications from Spring Boot Admin. This will notify the administrators when the application status is  DOWN or application status is coming UP. Spring Boot admin supports the below channels to notify the user.

  • Email Notifications
  • Pagerduty Notifications
  • Hipchat Notifications
  • Slack Notifications
  • Let’s Chat Notifications

In this article we will configure Slack notifications. Add the below properties to the Spring Boot Admin Server’s application.properties file.

With Spring Boot Admin we are managing all the applications. So we need to secure Spring Boot Admin UI with login feature. Let us enable login feature to Spring Boot Admin server. Here I am going with basic authentication. Add below maven dependencies to the Admin Server module.

Add the below properties to the application.properties file.

As we added security to the Admin Server, Admin clients should be able to connect to server by authenticating. Hence add the below properties to the Admin client’s application.properties files.

There are additional UI features like Hystrix, Turbine UI which we can enable to the dash-board. You can find more details here. The sample code created for this demonstration is available on Github.

Tagged with:
Posted in Spring, Spring Boot

Http/2 multiplexing and server push

Smart Techie

                 In this article we will see the main features for Http/2 specification. Till Http/1 the request and response processing between the client and server is simplex. That is, the client sends the request and server processes that , sends response back to the client. Then, client sends another request to server. If any of the request is blocked, then all other requests will have the performance impact. This biggest issue is tackled by introducing the request pipeline in Http/1.1. As part of request pipeline, the request will be sent in an order to the server. Server processes the multiple requests and sends the response back to the client in the same order. Again here the client and server communication is simplex. The below diagram depicts the client server communication happening with Http/1.0 and Http/1.1.

http/1 request processing

                 Till Http/1.1 the request and response are composed in text format and uses multiple TCP connections per origin. The issues like opening multiple TCP connections per origin, Text format, simplex communication is handled in Http/ 2. Now we will see how Http 2 processes request and responses.

http2 request processing

                    The Http/2 uses binary protocol to exchange the data. Http/2 opens single connection per origin and the same TCP connection is used to process multiple requests. Each request will be associated to a stream and the request will be divided into multiple frames. Each frame will have the stream identifier to which it belongs to. The client will send multiple frames belongs to multiple streams to the server asynchronously and the server will process the frames belongs to multiple streams and sends the response asynchronously to the client. The client will arrange the response based on the stream identifier. Here the communication is happening between the client and server simultaneously with out blocking.

              Another Http/2 feature is server push. When client requests for a resource from server, it pushes the additional resources along with the requested resources to the client to cache the data at the client side. This enhances the performance as the client cache is warmed up by the content.

http/2 server push

To know further about Http/2 go through the below links.

https://http2.github.io/

https://tools.ietf.org/html/rfc7540

http://royal.pingdom.com/2015/06/11/http2-new-protocol/

Tagged with: , ,
Posted in General

Java 9 : Convenience Factory Methods to create immutable Collections

Java 9

                             In this article we will see another JDK 9 feature to create immutable collections. Till Java 8, If we want to create immutable collections we use to call unmodifiableXXX() methods on java.util.Collections class. For example,  To create unmodifiable list, we should write below code.

The above code is too verbose to create a simple unmodifiable List. As Java is adopting functional programming style Java 9 came up with convenience, more compacted factory methods to create unmodifiable collections with JEP 269. Let us see how  that works.

Create Empty List:

Create Non-Empty List:

Create Non-Empty Map:

If you look at the above Java 9 factory method, the code is simple one liner to create immutable collections. In the coming article we will see another Java 9 feature. Till then, Stay Tuned!!!

Tagged with: ,
Posted in Java

jshell: The Java Shell (Read-Eval-Print Loop)

Java 9

                         In this article we will discuss about jshell(Java Shell) Java 9 feature. We can explore jShell with JDK 9 Early Access Release.  As of now the general availability of JDK9 is scheduled to 27th July, 2017. The jShell feature is proposed as part of JEP 222. The motivation behind jshell is to provide interactive command line tools to explore the features of java quickly. It is very useful tool to get the glimpse of Java features very quickly for the new learners. Already Java is incorporating functional programming features from Scala. In the same direction they want REPL(Read-Eval-Print Loop) interactive shell for Java as Scala, Ruby, JavaScript, Haskell, Clojure, and Python.

                       The jshell tool will be a command-line tool with features like, a history of statements with editing, tab completion, automatic addition of needed terminal semicolons, and configurable predefined imports.

After downloading the JDK 9, set the PATH variable to access jshell. Now, we will see how to use jshell. Below is the simple program using jshell. We no need to write a class with public static void main(String[] args) method to run simple hello world application.

Now we will write method which will add two variables and invoke method via jshell.

Now, we will create a static method with StringBuilder class without importing it, as jshell does that for you.

I hope you enjoyed jshell feature. In the next article we  will see another JDK 9 feature. Till then, Stay Tuned!!!

Tagged with: , ,
Posted in Java
DZone

DZone MVB

Java Code Geeks
Java Code Geeks