Ihar Hancharenka d82bbe85b2 m
2025-08-01 10:17:49 +03:00

890 строки
41 KiB
Plaintext

2022
Udemy - Apache Kafka Series - Kafka Streams for Data Processing
https://www.udemy.com/course/kafka-streams/
magnet:?xt=urn:btih:302b50a873ad4f997158ecad2f79f4f3e92c19f0
!!! 895m
Apache Kafka Series - Learn Apache Kafka for Beginners v3
https://www.udemy.com/course/apache-kafka/
https://www.oreilly.com/library/view/apache-kafka-series/9781789342604/
Udemy - Maarek - Apache Kafka Series - Learn Apache Kafka for Beginners v3
magnet:?xt=urn:btih:b9bd6bbfb5193f69128c6c34db379821a58d4de2
magnet:?xt=urn:btih:1485c91b36e8d099640dc6180dc42fd0e4a8997e
magnet:?xt=urn:btih:6f08ccb2d05668fb929dded3c8a5744d5976e7e1
https://freecoursesite.com/apache-kafka-series-learn-apache-kafka-for-beginners-v2-37/
https://course-downloader.com/apache-kafka-series-learn-apache-kafka-for-beginners-v2-4463
! 3.11g
????
Maarek - Apache Kafka Series – Learn Apache Kafka for Beginners v2
magnet:?xt=urn:btih:d18a88e411d34c627db87450b5631eeaed6cc193
magnet:?xt=urn:btih:b2f4b134238fb62b1cb06182178c348efe550cb5
magnet:?xt=urn:btih:ab0be03d946ff2cf02d1f078ec1bcb43f6a884f0
magnet:?xt=urn:btih:47dd77a975720aa74d108a7be5ef0ef100a8f55b
magnet:?xt=urn:btih:15b04ee6ab7bc26708ff8d59f65526bd13d822e4
magnet:?xt=urn:btih:e67fcb091584d050ba794c51e50cf3360682149d
magnet:?xt=urn:btih:f33d7bf93e72baf68e50bb5c0cdb055d395729fe
magnet:?xt=urn:btih:96e54381ee4af7b2c40e81c50143e94b759a99fa
https://tutsnode.net/apache-kafka-series-learn-apache-kafka-for-beginners-v2/
https://course-downloader.com/apache-kafka-series-learn-apache-kafka-for-beginners-v2-4425
https://github.com/conduktor/kafka-beginners-course
https://github.com/conduktor/kafka-stack-docker-compose
https://raw.githubusercontent.com/conduktor/kafka-stack-docker-compose/master/zk-single-kafka-single.yml
https://github.com/conduktor/conduktor-platform
https://github.com/conduktor/conduktor-platform/tree/main/example-local
! 3.16-10g
Udemy - Apache Kafka Series - Confluent Schema Registry & REST Proxy
https://www.udemy.com/course/confluent-schema-registry/
magnet:?xt=urn:btih:d67f45d8e3240f47a8efb5650b5b1ad46dde5ede
! 884.6m, 4h30m, interesting docker-compose
landoop/fast-data-dev:cp3.3.0
????
2019
Maarek - Learn Apache Kafka for Beginners
! 7h32m
https://www.linkedin.com/learning/learn-apache-kafka-for-beginners/intro-to-apache-kafka
https://github.com/simplesteph
https://medium.com/@stephane.maarek
TBD:
* Kafka Producer Metrics
https://www.datadoghq.com/blog/monitoring-kafka-performance-metrics/
https://medium.com/@aiiflo/kafka-producer-metrics-monitoring-key-metrics-aa25bafefcea
https://docs.confluent.io/platform/current/kafka/monitoring.html#global-request-metrics
batch-size-avg
theory
broker (each broker is also a bootstrap server)
has an integer ID
contain certain topic partitions, but not all
after connecting to any broker (called a bootstrap broker) - connects to the entire cluster
knows all the metadata (which topic, partitions are there at the cluster)
topics
is a particular stream of data
partitions
is ordered, order guaranteed only within partition
data is kept only for some limited amount of time (1 week)
data is immutable
offsets
incremental id (infinite)
replication factory
3 is usual, 2 - dangerous
leader
at any time only ONE broker can be a Leader for a given partition
only that leader can receive and serve data for a partition
the other brokers will synchronize data - ISR (in-sync replica)
send-mode, ACK
producer can choose to receive ACK for data-writes ()
acks=0 , no ack, dangerous
=1 , wait only for Leader to ack, limited data loss
=all, wait for all replicas, no data lost
key of the message
key=null -> round-robin
all the messages with the same key go to the same partition
consumers
read data in order "within each partition"
consumer-group
each consumer within a group reads from exclusive partitions
in case of number of consumer is greater than partitions, some consumers will be inactive
each group has "generation" and rebalance consumers upon adding/removing consumers in a group
!!! by using a new fresh consumer group, it is possible to read all the messages "again"
consumer-offsets
kafka stores the offsets at which a consumer group hass been reading
these offsets are committed live in a Kafka topic "__consumer_offsets"
these imply "delivery semantics" notion
delivery semantics
- at most once (offset is committed as soon as the message is received)
- at least once (offset is committed only after message has been processed)
processing system need to be idempotent in this case
- exactly once, can be achieved for Kafka=>(Kafka Workflows) using Stream API
zookeeper
in PROD operates with an odd number of zookeeper-servers (3, 5, 7)
one is Leader, others - Followers
zk does not store consumer offsets anymore starting from Kafka > v0.10
https://www.linkedin.com/learning/learn-apache-kafka-for-beginners/intro-to-apache-kafka
https://www.linkedin.com/learning/learn-apache-kafka-for-beginners/apache-kafka-in-five-minutes
https://www.linkedin.com/learning/learn-apache-kafka-for-beginners/kafka-theory-overview
https://www.linkedin.com/learning/learn-apache-kafka-for-beginners/topics-partitions-and-offsets
https://www.linkedin.com/learning/learn-apache-kafka-for-beginners/brokers-and-topics
https://www.linkedin.com/learning/learn-apache-kafka-for-beginners/topic-replication
https://www.linkedin.com/learning/learn-apache-kafka-for-beginners/producers-and-message-keys
https://www.linkedin.com/learning/learn-apache-kafka-for-beginners/consumer-and-consumer-group
https://www.linkedin.com/learning/learn-apache-kafka-for-beginners/consumer-offsets-and-delivery-semantics
https://www.linkedin.com/learning/learn-apache-kafka-for-beginners/kafka-broker-discovery
https://www.linkedin.com/learning/learn-apache-kafka-for-beginners/zookeeper
https://www.linkedin.com/learning/learn-apache-kafka-for-beginners/kafka-guarantees
https://www.linkedin.com/learning/learn-apache-kafka-for-beginners/macos-download-and-set-up-kafka-in-path
...
https://www.linkedin.com/learning/learn-apache-kafka-for-beginners/kafka-topics-cli
kafka-topics
common-stuff:
--zookeeper <host>:2181
--list
list all existing topics
--create
--topic <topic-name>
--partitions 3
--replication-factor 2 (need to be more than the number of available brokers)
--describe
good table of partitions
--delete
mark a topic for deletion (does not delete it right away)
(delete.topic.enable=true)
https://www.linkedin.com/learning/learn-apache-kafka-for-beginners/kafka-console-producer-cli
kafka-console-producer
common-stuff:
--broker-list <host>:9192
--producer-property
acks=all
--topic <topic-name>
line-by-line till Ctrl-C
Note: new topic is created if needed (LEADER_NOT_AVAILABLE WARN)
with default num of partitions and repl-factor
config/server.properties
num.partitions=1 # default
https://www.linkedin.com/learning/learn-apache-kafka-for-beginners/kafka-console-consumer-cli
kafka-console-consumer
common-stuff:
--bootstrap-server <host>:9192
--topic <topic-name>
--from-beginning
!!! by default, this consumer only reads new messages (from the start moment)
...
https://www.linkedin.com/learning/learn-apache-kafka-for-beginners/kafka-consumers-in-groups
kafka-console-consumer
...
--group <consumer-group-name>
to be part of a consumer group
https://www.linkedin.com/learning/learn-apache-kafka-for-beginners/kafka-consumer-groups-cli
kafka-consumer-groups
common-stuff:
--bootstrap-server <host>:9192
--list
... console-consumer-<id>
--describe <group-name>
... LAG 0 -> meaning console conumer read all the data
...
https://www.linkedin.com/learning/learn-apache-kafka-for-beginners/resetting-offsets
kafka-consumer-groups
...
--reset-offsets
--dry-run|--execute
--by-period
--to-earliest
... scopes...
--all-topics
--topic <topic-name>
--shift-by <num>
--describe
--group <group-name>
https://www.linkedin.com/learning/learn-apache-kafka-for-beginners/kafka-tool-ui
www.kafkatool.com
#
# Java stuff
#
https://kafka.apache.org/documentation/
https://www.linkedin.com/learning/learn-apache-kafka-for-beginners/creating-a-kafka-project
mvn deps: org.apache.kafka:kafka-clients
https://www.linkedin.com/learning/learn-apache-kafka-for-beginners/java-producer
https://kafka.apache.org/documentation/#producerapi
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
Properties properties = new Properties();
// bootstrap.servers
properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "...");
// key.serializer
properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
// value.serializer
properties.setProperty(ProducerConfig.VALUEKEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
// <K, V>
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties);
// key and other stuff can be here as well
ProducerRecord<String, String> record = new ProducerRecord<String, String>("first-topic", "some-value");
// this is async !!!
producer.send(record);
// to actually send
producer.flush();
// flush and close
producer.close();
https://www.linkedin.com/learning/learn-apache-kafka-for-beginners/java-producer-callbacks
...
import org.apache.kafka.clients.producer.Callback;
...
// set data asynchronously
producer.send(record, new Callback() {
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
// executes every time record is successfuly sent or an exception is thrown
if (e == null) {
// the record was successfully sent
// log recordMetadata.topic(), .partition(), .offset(), .timestamp()
} else {
}
}
});
https://www.linkedin.com/learning/learn-apache-kafka-for-beginners/java-producer-with-keys
...
ProducerRecord<String, String> record = new ProducerRecord<String, String>("first-topic", "some-key", some-value");
...
// set data asynchronously
producer.send(record, new Callback() {
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
// executes every time record is successfuly sent or an exception is thrown
if (e == null) {
// the record was successfully sent
// log recordMetadata.topic(), .partition(), .offset(), .timestamp()
} else {
}
}
}).get(); // block the .send() to make it synchronous. Don't do it in PROD...
// throws ExecutionException, InterruptedException
https://www.linkedin.com/learning/learn-apache-kafka-for-beginners/java-consumer
import org.apache.kafka.clients.producer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
Properties properties = new Properties();
// bootstrap.servers
properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "...");
// key.deserializer
properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
// value.deserializer
properties.setProperty(ConsumerConfig.VALUEKEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
// groupId
properties.setProperty(ConsumerConfig.GROUP_ID, "my-fourth-app");
// auto.offset.reset
// "earliest" is equivalent to "--from-beginning" CLI option
properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest|latest|none");
KafkaConsumer<String, Strring> consumer = new KafkaConsumer<String, String>(properties);
consumer.subscribe(Collection.singleton("first-topic")); // Arrays.asList("first-topic", "second-topic")
while(true) {
// java.time.Duration
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); // deprecated
for (ConsumerRecord<String, String> record : records) {
// record .key(), .value(), .partition(), .offset()
}
}
https://www.linkedin.com/learning/learn-apache-kafka-for-beginners/java-consumer-inside-consumer-group
...
groupId = "my-fifth-application"
...
// !!! because we started 2-nd consumer, the group started rebalancing
log:
INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-1, group-id=my-fifth-application]
Attempt to heartbeat failed since group is rebalancing
INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-1, group-id=my-fifth-application]
Revoking previously assigned partitions [first_topic-0, first_topic-1, first_topic-2]
INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-1, group-id=my-fifth-application]
(Re-)joining group
INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-1, group-id=my-fifth-application]
Successfully joined group with generation 2
INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-1, group-id=my-fifth-application]
Setting newly assigned partitions [first_topic-0, first_topic-1]
https://www.linkedin.com/learning/learn-apache-kafka-for-beginners/java-consumer-with-threads
// let's get rid of the ugly while(true) loop
import java.util.concurrent.CountDownLatch;
...
public class ConsumerDemoWithThread {
public static void main(String[] args) {
new ConsumerDemoWithThread().run();
}
private ConsumerDemoWithThread() {
}
private void run() {
Logger logger = LoggerFactory.getLogger(ConsumerDemoWithThread.class);
...
groupId = "my-sixth-application"
CountDownLatch latch = new CountDownLatch(1);
Runnable myConsumerRunnable = new ConsumerRunnable(..., latch);
Thread myThread = new Thread(myConsumerRunnable);
myThread.start();
Runtime.getRuntime().addShutdownHook(new Thread( () -> {
logger.info("Caught shutdown hook");
((ConsumerRunnable) myConsumerRunnable).shutdown();
try {
latch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
logger.info("Application has exited");
}))
try {
latch.await();
} catch() {
logger.error("Application is interrupting", e);
} finally {
logger.info("Application is closing");
}
}
public class ConsumerThread extends Runnable {
private static Logger logger = LoggerFactory.getLogger(ConsumerThread.class);
private CountDownLatch latch;
private KafkaConsumer<String, String> consumer;
public ConsumerThread(String bootstrapServers, String groupId, String topic, CountDownLatch latch) {
this.latch = latch;
Properties properties = new Properties();
// bootstrap.servers
properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
// key.deserializer
properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
// value.deserializer
properties.setProperty(ConsumerConfig.VALUEKEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
// groupId
properties.setProperty(ConsumerConfig.GROUP_ID, groupId);
// auto.offset.reset
// "earliest" is equivalent to "--from-beginning" CLI option
properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest|latest|none");
consumer = new KafkaConsumer<String, String>(properties);
}
@Override
public void run() {
try {
while(true) {
// java.time.Duration
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); // deprecated
for (ConsumerRecord<String, String> record : records) {
// record .key(), .value(), .partition(), .offset()
}
}
} catch (WakeupException e) {
logger.info("received shutdown exception");
} finally {
// super-important!
consumer.close();
// tell our main thread that we're done with the consumer
latch.countDown();
}
}
public void shutdown() {
// a special method to interrupt consumer.poll()
// it will throw WakeupException
consumer.wakeup();
}
}
https://www.linkedin.com/learning/learn-apache-kafka-for-beginners/java-consumer-seek-and-assign
// assign and seek
import org.apache.kafka.common.TopicPartition;
...
remove the groupId property:
// properties.setProperty(ConsumerConfig.GROUP_ID, groupId);
and don't subscribe to topic
// consumer.subscribe(Collection.singleton("first-topic")); // Arrays.asList("first-topic", "second-topic")
// assign and seek are mostly used to replay data or fetch a specific message
// assign
TopicPartition partition = new TopicPartition(topic, 0); // partition 0
consumer.assign(Arrays.asList(partitionsToReadFrom));
// seek
long offsetToReadFrom = 15L;
consumer.seek(partitionsToReadFrom, offsetToReadFrom); // this consumer should read from these partitions and offsets
int numberOfMessagesToRead = 5;
boolean keepOnReading = true;
int numberOfMessagesReadSoFar = 0;
while(keepOnReading) {
// java.time.Duration
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); // new in Kafka 2.0
for (ConsumerRecord<String, String> record : records) {
numberOfMessagesToRead += 1;
// record .key(), .value(), .partition(), .offset()
if (numberOfMessagesReadSoFar >= numberOfMessagesToRead) {
keepOnReading = false; // to exit while-loop
break to exit for-loop
}
}
}
logger.info("Exitin the application");
https://www.linkedin.com/learning/learn-apache-kafka-for-beginners/client-bidirectional-compatibility
// As of kafka 0.10.2 (July 2017) it has bi-directional compatibility
- an OLDER client (ex 1.1) can talk to a newer broker (2.0)
- a NEWER client (ex 2.0) can talk to an older broker (1.1)
https://www.linkedin.com/learning/learn-apache-kafka-for-beginners/twitter-setup
https://apps.twitter.com
https://developer.twitter.com
for personal use
...
keys&tokens -> Consumer API keys (API key, API secret key)
access token & access token secret
https://github.com/twitter/hbc
com.twitter:hbc-core
https://www.linkedin.com/learning/learn-apache-kafka-for-beginners/producer-part-1-writing-a-twitter-client
...
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
public class TwitterProducer {
private static Logger logger = LoggerFactory.getLogger(TwitterProducer.class);
public TwitterProducer() {
}
public static void main(String[] args) {
new TwitterProducer().run();
}
public void run() {
// create a twitter client
BlockingQueue<String> msgQueue = new LinkedBlockingQueue<String>(100000);
Client client = createTwitterClient();
// attempt to establish connection
client.connect();
// create a kafka producer
// loop to send tweets to kafka
while (!client.isDone()) {
try {
String msg = msgQueue.poll(5, TimeUnit.SECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
client.stop();
}
if (msg != null) {
logger.info(msg);
}
}
logger.info("End of application");
}
String concumerKey = "";
// ...
public Client createTwitterClient(BlockingQueue<String> msgQueue) {
// ...
Hosts hosebirdHosts = new HttpHosts(Constants.STREAM_HOST);
StatusesFilterEndpoint hosebirdEndpoint = new StatusesFilterEndpoint();
List<String> terms = Lists.newArrayList("kafka");
hosebirdEndpoint.trackTerms(terms);
Authentication hosebirdAuth = new OAuth1(consumerKey, consumerSecret, token, tokenSecret);
ClientBuilder builder = new ClientBuilder()
.name("...")
.hosts(hosebirdHosts)
.authentication(hosebirdAuth)
.endpoint(hosebirdEndpoint)
.processor(new StringDelimitedProcessor(msgQueue))
Client hosebirdClient = builder.build();
return hosebirdClient
}
}
https://www.linkedin.com/learning/learn-apache-kafka-for-beginners/producer-part-2-writing-the-kafka-producer
...
public KafkaProducer<String, String> createKafkaProducer() {
Properties properties = new Properties();
properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "...");
properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.setProperty(ProducerConfig.VALUEKEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
// <K, V>
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties);
return producer;
}
...
// create a kafka producer
KafkaProducer<String, String> producer = createKafkaProducer();
// add a shutdown hook
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
logger.info("stopping twitter client");
client.stop();
logger.info("closing producer");
producer.stop();
logger.info("done");
}));
// loop to send tweets to kafka
// on a different thread, or multiple different threads...
while (!client.isDone()) {
String msg = null;
try {
msg = msgQueue.poll(5, TimeUnit.SECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
client.stop();
}
if (msg != null) {
logger.info(msg);
producer.send(new ProducerRecord<>("twitter_tweets", null, msg), new Callback() { // topic, key, value
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if (e != null) {
logger.error(e.getMessage(), e);
}
}
});
}
}
$ kafka-topics --zookeper 127.0.0.1:2181 --create --topic tweeter_tweets --partitions 6 --replication-factor 1
... created topic tweeter_tweets
https://www.linkedin.com/learning/learn-apache-kafka-for-beginners/producer-configurations-introduction
note: in the logs
INFO org.apache.kafka.clients.producer.ProducerConfig - ProducerConfig values:
acks = 1
batch.size = 16384
bootstrap.servers = [127.0.0.1:2181]
buffer.memory = 33554432
client.id =
compression.type = none
enable.idempotence = false
interceptor.classes = []
key.serializer = org.apacke.kafka.common.serialization.StringSerializer
linger.ms = 0
max.request.size =
value.seializer =
https://www.linkedin.com/learning/learn-apache-kafka-for-beginners/acks-and-min-insync-replicas
Producer Acks Deep Dive
acks = 0 (no acks), useful if it's ok to potentially loose messages, nice for performance
acks = 1 (leader acknoledgements)
acks = all (leader + replicas), most guarantee, adds safety and latency
must be used in conjunction with
min.insync.replicas - can be set at the broker or topic level (override)
min.insync.replicas = 2 (at least 2 brokers, incl the leader, must respond)
Exception: NOT_ENOUGH_REPLICAS
https://www.linkedin.com/learning/learn-apache-kafka-for-beginners/retries-and-max-in-flight-requests-per-connection
Producer Retries
in case of transient failures, developers are expected to handle exceptions,
otherwise the data will be lost.
Example of transient failure:
* NotEnoughReplicasException
Thre is a "retries" setting
* defaults to 0
* You can increase even to an Integer.MAX_VALUE
In case of retries, by default there is a chance that messages will be sent out of order
(if a batch has failed to be sent)
If you rely on key-based ordering, that can be an issue
For this, you can set the setting while controls how many produce
requests can be made in parallell (to any partition):
max.in.flight.requests.per.connection
* Defaults: 5
* set to 1 if you need to ensure ordering (may impact throughput)
In Kafka >= 1.0.0 there is a better (and simpler one) solution
https://www.linkedin.com/learning/learn-apache-kafka-for-beginners/idempotent-producer
The Producer can introduce duplicate messages in Kafka due to the network errors
If "ack" from kafka does not go back to producer due to the network errors,
producer produce "retry" request and kafka commits duplicate
In Kafka >= 0.11, you can define a "idempotent producer",
which introduces "commit id"
They (idempotent producers) come with
* retries = Integer.MAX_VALUE (2^31 - 1)
* max.in.flight.requests = 1 (kafka >= 0.11 || kafka < 1.1)
* max.in.flight.requests = 5 (kafka >= 1.1 - higher performance)
* acks = all
just set:
producerProps.put("enable.idempotence", "true")
https://www.linkedin.com/learning/learn-apache-kafka-for-beginners/producer-part-3-safe-producer
kafka < 0.11
* acks = all
* min.insync.replicas = 2 (broker/topic level)
ensures two brokers in ISR at least have the data after an ack
* retry = MAX_INT (producer level)
ensures transient errors are retried indefinitely
* max.in.flight.requests.per.connection = 1 (producer level)
kafka >= 0.11
* enable.idempotence = true (producer level) + min.insync.replicas = 2 (broker/topic level)
implies acks = all, retry = MAX_INT, max.in.flight.requests = 5 (default)
while keeping ordering guarantees and improving performance!
and also don't get duplicates
!!! running idempotence producer impact througput and latency, always test
...
properties.setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
properties.setProperty(ProducerConfig.ACKS_CONFIG, "all");
properties.setProperty(ProducerConfig.RETRIES_CONFIG, Integer.toString(Integer.MAX_VALUE);
properties.setProperty(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "5"); // for kafka >= 1.1
https://www.linkedin.com/learning/learn-apache-kafka-for-beginners/producer-compression
compression.type
* note (default)
* gzip
* lz4
* snappy
it is more effective the bigger the batch of message being sent to Kafka
* benchmark here (https://blog.coudflare.com/squeezing-the-firehose)
https://www.linkedin.com/learning/learn-apache-kafka-for-beginners/producer-batching
...
by default, kafka tries to send data as soon as possible
* It will have up to 5 requests in flight,
meaning up to 5 messages individually sent at the same time
* After this, if more messages have to be sent while others are in flight,
Kafka is smart and will start batching them while they wait
to send them all at once
This smart batching allows Kafka to increase throughput
while maintaining very low latency
Batches have higher compression ratio so better efficiency
all messages in a batch are sent in one request
How to control batching?
* linger.ms - number of ms, a producer is willing to wait
before sending a batch out (default 0)
by introducing some lag (for example linger.ms=5) we increase
chances of messages being sent togeather in a batch
* if batch is full (see batch.size) before the end of the linger.ms period,
it will be sent to Kafka right away
the default is 16Kb (can increase to 32/64)
any message bigger will not be batched
A batch is allocated per partition
Note: you can monitor the average batch size metric using
* Kafka Producer Metrics
https://www.linkedin.com/learning/learn-apache-kafka-for-beginners/producer-part-4-high-throughput-producer
* add "snappy" for mainly text-data
* batch size to 32Kb
* linger.ms to 20ms
properties.setProperty(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy");
properties.setProperty(ProducerConfig.LINGER_MS_CONFIG, "20");
properties.setProperty(ProducerConfig.BATCH_SIZE_CONFIG, Integer.toString(32*1024));
https://www.linkedin.com/learning/learn-apache-kafka-for-beginners/producer-default-partitions-and-key-hashing
by default, keys are hashed using the
* murmur2
It is possible to override the behavior of the partitioner
* partitioner.class
The formula is:
* targetPartition = Utils.abs(Utils.murmur2(record.key())) % numPartitions;
If the producer produces faster than the broker can take,
the recors will be buffered in memory
* buffer.memory = 33554432 (32Mb): the size of the send buffer
If the buffer is full (32Mb), then the .send() method will start to block
(won't return right away)
* Max.block.ms = 60000 (1 min) - the time .send() will block
untill throwing an exception for couple of reasons
* the producer has filled up its buffer
* the broker is not accepting any new data
* 60 seconds has elapsed
https://www.linkedin.com/learning/learn-apache-kafka-for-beginners/refactoring-the-project
create maven modules:
* kafka-basics
* kafka-producer-twitter
https://www.linkedin.com/learning/learn-apache-kafka-for-beginners/setting-up-elasticsearch-in-the-cloud
https://bonsai.io/
creds, interactiv console
https://www.linkedin.com/learning/learn-apache-kafka-for-beginners/elasticsearch-101
https://www.elastic.co/guide/en/elasticsearch/reference/current/_cluster_health.html
/_cat/health?v
/_cat/nodes?v
/_cat/indices?v
PUT
/twitter
{}
/twitter/tweets/1
{
"course": "Kafka for beg...",
"instructor": "Stephane Maarek",
"module": "ElasticSearch"
}
GET
/twitter/tweets/1
{
"_index": "twitter",
"_type": "tweets",
"_id": 1,
"_version": 2,
"found": true,
"_source": {
"course": "Kafka for beg...",
"instructor": "Stephane Maarek",
"module": "ElasticSearch"
}
}
DELETE
/twitter
https://www.linkedin.com/learning/learn-apache-kafka-for-beginners/consumer-part-1-set-up-project
https://www.elastic.co/guide/en/elasticsearch/client/java-rest/current/java-rest-high-getting-started-maven.html
dep: org.elasticsearch.client:elasticsearch-rest-high-level-client:6.4.0
public class ElasticSearchConsumer {
public static RestHighLevelClient createClient() {
// https://<username>:<password>@<hostname>
String hostname = "";
String username = "";
String password = "";
// !!! not for the case of local ES
final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(AuthScope.ANY,
new UsernamePasswordCredentials(username, password));
RestClientBuilder builder = RestClient.builder(
new HttpHost(hostname, 443, "https"))
.setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
@Override
public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) {
return httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
}
});
RestHighLevelClient client = new RestHighLevelClient(builder);
return client;
}
public static void main(String[] args) throws IOException {
RestHighLevelClient client = createClient();
String jsonString = "{\"foo\": \"bar\"}"
IndexRequest indexRequest = new IndexRequest("twitter", "tweets")
.source(jsonString, XContentType.JSON);
// IOException
IndexResponse indexResponse = client.index(indexRequest, RequestOptions.DEFAULT);
String id = indexResponse.getId();
logger.info(id);
client.close();
}
}
https://www.linkedin.com/learning/learn-apache-kafka-for-beginners/consumer-part-2-write-the-consumer-and-send-to-elasticsearch
...
// String topic = "twitter_tweets";
public static KafkaConsumer<String, String> createConsumer(String topic) {
...
String groupId = "kafka-demo-elasticsearch";
...
properties.setProperty(ConsumerConfig.GROUP_ID, groupId);
...
// "earliest" is equivalent to "--from-beginning" CLI option
properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest|latest|none");
...
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
consumer.subscribe(Arrays.asList(topic));
return consumer;
}
...
public static void main(String[] args) {
...
while(true) {
// java.time.Duration
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
// record .key(), .value(), .partition(), .offset()
String jsonString = record.value();
IndexRequest indexRequest = new IndexRequest(
"twitter", // indexer
"tweets" // type
).source(jsonString, XContentType.JSON);
IndexResponse indexResponse = client.index(indexRequest, RequestOptions.DEFAULT);
String id = indexResponse.getId();
logger.info(id);
Thread.sleep(1000); // just for demo
}
}
// client.close();
}
https://www.linkedin.com/learning/learn-apache-kafka-for-beginners/consumer-part-3-delivery-semantics
// making it idempotent by tracking id
...
IndexRequest indexRequest = new IndexRequest(
"twitter", // indexer
"tweets", // type
id // !!!
)...
...
// 2 strategies
//
// 1. kafka generic id
// String id = record.topic() + "_" + record.partition() + "_" + record.offset();
//
// 2. twitter specific id
String id = extractIdFromTweet(record.value());
IndexRequest indexRequest = new IndexRequest(
"twitter", // indexer
"tweets", // type
id // this is to make consumer idempotent
).source(jsonString, XContentType.JSON);
...
// google gson
private static JsonParser jsonParser = new JsonParser();
private static String extractIdFromTweet(String tweetJson) {
return jsonParser.parse(tweetJson)
.getAsJsonObject()
.get("id_str")
.getAsString();
}
https://www.linkedin.com/learning/learn-apache-kafka-for-beginners/delivery-semantics-for-consumers
At most once
offsets are committed as soon as the message batch is received.
If the process goes wrong, the message will be lost
(it won't be read again)
At least once
offsets are committed after the message is processed
if the processing goes wrong, the message will be read again
This can result in duplicate processing of messages.
Make sure your processing is idempotent
Exactly once
only for Kafka => Kafka workflows
using Kafka Streams API
for Kafka => Sink workflows, use an idempotent consumer
https://www.linkedin.com/learning/learn-apache-kafka-for-beginners/delivery-semantics-for-consumers
1:00
...
properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest|latest|none");