2022 Udemy - Apache Kafka Series - Kafka Streams for Data Processing https://www.udemy.com/course/kafka-streams/ magnet:?xt=urn:btih:302b50a873ad4f997158ecad2f79f4f3e92c19f0 !!! 895m Apache Kafka Series - Learn Apache Kafka for Beginners v3 https://www.udemy.com/course/apache-kafka/ https://www.oreilly.com/library/view/apache-kafka-series/9781789342604/ Udemy - Maarek - Apache Kafka Series - Learn Apache Kafka for Beginners v3 magnet:?xt=urn:btih:b9bd6bbfb5193f69128c6c34db379821a58d4de2 magnet:?xt=urn:btih:1485c91b36e8d099640dc6180dc42fd0e4a8997e magnet:?xt=urn:btih:6f08ccb2d05668fb929dded3c8a5744d5976e7e1 https://freecoursesite.com/apache-kafka-series-learn-apache-kafka-for-beginners-v2-37/ https://course-downloader.com/apache-kafka-series-learn-apache-kafka-for-beginners-v2-4463 ! 3.11g ???? Maarek - Apache Kafka Series – Learn Apache Kafka for Beginners v2 magnet:?xt=urn:btih:d18a88e411d34c627db87450b5631eeaed6cc193 magnet:?xt=urn:btih:b2f4b134238fb62b1cb06182178c348efe550cb5 magnet:?xt=urn:btih:ab0be03d946ff2cf02d1f078ec1bcb43f6a884f0 magnet:?xt=urn:btih:47dd77a975720aa74d108a7be5ef0ef100a8f55b magnet:?xt=urn:btih:15b04ee6ab7bc26708ff8d59f65526bd13d822e4 magnet:?xt=urn:btih:e67fcb091584d050ba794c51e50cf3360682149d magnet:?xt=urn:btih:f33d7bf93e72baf68e50bb5c0cdb055d395729fe magnet:?xt=urn:btih:96e54381ee4af7b2c40e81c50143e94b759a99fa https://tutsnode.net/apache-kafka-series-learn-apache-kafka-for-beginners-v2/ https://course-downloader.com/apache-kafka-series-learn-apache-kafka-for-beginners-v2-4425 https://github.com/conduktor/kafka-beginners-course https://github.com/conduktor/kafka-stack-docker-compose https://raw.githubusercontent.com/conduktor/kafka-stack-docker-compose/master/zk-single-kafka-single.yml https://github.com/conduktor/conduktor-platform https://github.com/conduktor/conduktor-platform/tree/main/example-local ! 3.16-10g Udemy - Apache Kafka Series - Confluent Schema Registry & REST Proxy https://www.udemy.com/course/confluent-schema-registry/ magnet:?xt=urn:btih:d67f45d8e3240f47a8efb5650b5b1ad46dde5ede ! 884.6m, 4h30m, interesting docker-compose landoop/fast-data-dev:cp3.3.0 ???? 2019 Maarek - Learn Apache Kafka for Beginners ! 7h32m https://www.linkedin.com/learning/learn-apache-kafka-for-beginners/intro-to-apache-kafka https://github.com/simplesteph https://medium.com/@stephane.maarek TBD: * Kafka Producer Metrics https://www.datadoghq.com/blog/monitoring-kafka-performance-metrics/ https://medium.com/@aiiflo/kafka-producer-metrics-monitoring-key-metrics-aa25bafefcea https://docs.confluent.io/platform/current/kafka/monitoring.html#global-request-metrics batch-size-avg theory broker (each broker is also a bootstrap server) has an integer ID contain certain topic partitions, but not all after connecting to any broker (called a bootstrap broker) - connects to the entire cluster knows all the metadata (which topic, partitions are there at the cluster) topics is a particular stream of data partitions is ordered, order guaranteed only within partition data is kept only for some limited amount of time (1 week) data is immutable offsets incremental id (infinite) replication factory 3 is usual, 2 - dangerous leader at any time only ONE broker can be a Leader for a given partition only that leader can receive and serve data for a partition the other brokers will synchronize data - ISR (in-sync replica) send-mode, ACK producer can choose to receive ACK for data-writes () acks=0 , no ack, dangerous =1 , wait only for Leader to ack, limited data loss =all, wait for all replicas, no data lost key of the message key=null -> round-robin all the messages with the same key go to the same partition consumers read data in order "within each partition" consumer-group each consumer within a group reads from exclusive partitions in case of number of consumer is greater than partitions, some consumers will be inactive each group has "generation" and rebalance consumers upon adding/removing consumers in a group !!! by using a new fresh consumer group, it is possible to read all the messages "again" consumer-offsets kafka stores the offsets at which a consumer group hass been reading these offsets are committed live in a Kafka topic "__consumer_offsets" these imply "delivery semantics" notion delivery semantics - at most once (offset is committed as soon as the message is received) - at least once (offset is committed only after message has been processed) processing system need to be idempotent in this case - exactly once, can be achieved for Kafka=>(Kafka Workflows) using Stream API zookeeper in PROD operates with an odd number of zookeeper-servers (3, 5, 7) one is Leader, others - Followers zk does not store consumer offsets anymore starting from Kafka > v0.10 https://www.linkedin.com/learning/learn-apache-kafka-for-beginners/intro-to-apache-kafka https://www.linkedin.com/learning/learn-apache-kafka-for-beginners/apache-kafka-in-five-minutes https://www.linkedin.com/learning/learn-apache-kafka-for-beginners/kafka-theory-overview https://www.linkedin.com/learning/learn-apache-kafka-for-beginners/topics-partitions-and-offsets https://www.linkedin.com/learning/learn-apache-kafka-for-beginners/brokers-and-topics https://www.linkedin.com/learning/learn-apache-kafka-for-beginners/topic-replication https://www.linkedin.com/learning/learn-apache-kafka-for-beginners/producers-and-message-keys https://www.linkedin.com/learning/learn-apache-kafka-for-beginners/consumer-and-consumer-group https://www.linkedin.com/learning/learn-apache-kafka-for-beginners/consumer-offsets-and-delivery-semantics https://www.linkedin.com/learning/learn-apache-kafka-for-beginners/kafka-broker-discovery https://www.linkedin.com/learning/learn-apache-kafka-for-beginners/zookeeper https://www.linkedin.com/learning/learn-apache-kafka-for-beginners/kafka-guarantees https://www.linkedin.com/learning/learn-apache-kafka-for-beginners/macos-download-and-set-up-kafka-in-path ... https://www.linkedin.com/learning/learn-apache-kafka-for-beginners/kafka-topics-cli kafka-topics common-stuff: --zookeeper :2181 --list list all existing topics --create --topic --partitions 3 --replication-factor 2 (need to be more than the number of available brokers) --describe good table of partitions --delete mark a topic for deletion (does not delete it right away) (delete.topic.enable=true) https://www.linkedin.com/learning/learn-apache-kafka-for-beginners/kafka-console-producer-cli kafka-console-producer common-stuff: --broker-list :9192 --producer-property acks=all --topic line-by-line till Ctrl-C Note: new topic is created if needed (LEADER_NOT_AVAILABLE WARN) with default num of partitions and repl-factor config/server.properties num.partitions=1 # default https://www.linkedin.com/learning/learn-apache-kafka-for-beginners/kafka-console-consumer-cli kafka-console-consumer common-stuff: --bootstrap-server :9192 --topic --from-beginning !!! by default, this consumer only reads new messages (from the start moment) ... https://www.linkedin.com/learning/learn-apache-kafka-for-beginners/kafka-consumers-in-groups kafka-console-consumer ... --group to be part of a consumer group https://www.linkedin.com/learning/learn-apache-kafka-for-beginners/kafka-consumer-groups-cli kafka-consumer-groups common-stuff: --bootstrap-server :9192 --list ... console-consumer- --describe ... LAG 0 -> meaning console conumer read all the data ... https://www.linkedin.com/learning/learn-apache-kafka-for-beginners/resetting-offsets kafka-consumer-groups ... --reset-offsets --dry-run|--execute --by-period --to-earliest ... scopes... --all-topics --topic --shift-by --describe --group https://www.linkedin.com/learning/learn-apache-kafka-for-beginners/kafka-tool-ui www.kafkatool.com # # Java stuff # https://kafka.apache.org/documentation/ https://www.linkedin.com/learning/learn-apache-kafka-for-beginners/creating-a-kafka-project mvn deps: org.apache.kafka:kafka-clients https://www.linkedin.com/learning/learn-apache-kafka-for-beginners/java-producer https://kafka.apache.org/documentation/#producerapi import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.serialization.StringSerializer; Properties properties = new Properties(); // bootstrap.servers properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "..."); // key.serializer properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); // value.serializer properties.setProperty(ProducerConfig.VALUEKEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); // KafkaProducer producer = new KafkaProducer(properties); // key and other stuff can be here as well ProducerRecord record = new ProducerRecord("first-topic", "some-value"); // this is async !!! producer.send(record); // to actually send producer.flush(); // flush and close producer.close(); https://www.linkedin.com/learning/learn-apache-kafka-for-beginners/java-producer-callbacks ... import org.apache.kafka.clients.producer.Callback; ... // set data asynchronously producer.send(record, new Callback() { public void onCompletion(RecordMetadata recordMetadata, Exception e) { // executes every time record is successfuly sent or an exception is thrown if (e == null) { // the record was successfully sent // log recordMetadata.topic(), .partition(), .offset(), .timestamp() } else { } } }); https://www.linkedin.com/learning/learn-apache-kafka-for-beginners/java-producer-with-keys ... ProducerRecord record = new ProducerRecord("first-topic", "some-key", some-value"); ... // set data asynchronously producer.send(record, new Callback() { public void onCompletion(RecordMetadata recordMetadata, Exception e) { // executes every time record is successfuly sent or an exception is thrown if (e == null) { // the record was successfully sent // log recordMetadata.topic(), .partition(), .offset(), .timestamp() } else { } } }).get(); // block the .send() to make it synchronous. Don't do it in PROD... // throws ExecutionException, InterruptedException https://www.linkedin.com/learning/learn-apache-kafka-for-beginners/java-consumer import org.apache.kafka.clients.producer.ConsumerConfig; import org.apache.kafka.common.serialization.StringDeserializer; Properties properties = new Properties(); // bootstrap.servers properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "..."); // key.deserializer properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); // value.deserializer properties.setProperty(ConsumerConfig.VALUEKEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); // groupId properties.setProperty(ConsumerConfig.GROUP_ID, "my-fourth-app"); // auto.offset.reset // "earliest" is equivalent to "--from-beginning" CLI option properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest|latest|none"); KafkaConsumer consumer = new KafkaConsumer(properties); consumer.subscribe(Collection.singleton("first-topic")); // Arrays.asList("first-topic", "second-topic") while(true) { // java.time.Duration ConsumerRecords records = consumer.poll(Duration.ofMillis(100)); // deprecated for (ConsumerRecord record : records) { // record .key(), .value(), .partition(), .offset() } } https://www.linkedin.com/learning/learn-apache-kafka-for-beginners/java-consumer-inside-consumer-group ... groupId = "my-fifth-application" ... // !!! because we started 2-nd consumer, the group started rebalancing log: INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-1, group-id=my-fifth-application] Attempt to heartbeat failed since group is rebalancing INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-1, group-id=my-fifth-application] Revoking previously assigned partitions [first_topic-0, first_topic-1, first_topic-2] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-1, group-id=my-fifth-application] (Re-)joining group INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-1, group-id=my-fifth-application] Successfully joined group with generation 2 INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-1, group-id=my-fifth-application] Setting newly assigned partitions [first_topic-0, first_topic-1] https://www.linkedin.com/learning/learn-apache-kafka-for-beginners/java-consumer-with-threads // let's get rid of the ugly while(true) loop import java.util.concurrent.CountDownLatch; ... public class ConsumerDemoWithThread { public static void main(String[] args) { new ConsumerDemoWithThread().run(); } private ConsumerDemoWithThread() { } private void run() { Logger logger = LoggerFactory.getLogger(ConsumerDemoWithThread.class); ... groupId = "my-sixth-application" CountDownLatch latch = new CountDownLatch(1); Runnable myConsumerRunnable = new ConsumerRunnable(..., latch); Thread myThread = new Thread(myConsumerRunnable); myThread.start(); Runtime.getRuntime().addShutdownHook(new Thread( () -> { logger.info("Caught shutdown hook"); ((ConsumerRunnable) myConsumerRunnable).shutdown(); try { latch.await(); } catch (InterruptedException e) { e.printStackTrace(); } logger.info("Application has exited"); })) try { latch.await(); } catch() { logger.error("Application is interrupting", e); } finally { logger.info("Application is closing"); } } public class ConsumerThread extends Runnable { private static Logger logger = LoggerFactory.getLogger(ConsumerThread.class); private CountDownLatch latch; private KafkaConsumer consumer; public ConsumerThread(String bootstrapServers, String groupId, String topic, CountDownLatch latch) { this.latch = latch; Properties properties = new Properties(); // bootstrap.servers properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); // key.deserializer properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); // value.deserializer properties.setProperty(ConsumerConfig.VALUEKEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); // groupId properties.setProperty(ConsumerConfig.GROUP_ID, groupId); // auto.offset.reset // "earliest" is equivalent to "--from-beginning" CLI option properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest|latest|none"); consumer = new KafkaConsumer(properties); } @Override public void run() { try { while(true) { // java.time.Duration ConsumerRecords records = consumer.poll(Duration.ofMillis(100)); // deprecated for (ConsumerRecord record : records) { // record .key(), .value(), .partition(), .offset() } } } catch (WakeupException e) { logger.info("received shutdown exception"); } finally { // super-important! consumer.close(); // tell our main thread that we're done with the consumer latch.countDown(); } } public void shutdown() { // a special method to interrupt consumer.poll() // it will throw WakeupException consumer.wakeup(); } } https://www.linkedin.com/learning/learn-apache-kafka-for-beginners/java-consumer-seek-and-assign // assign and seek import org.apache.kafka.common.TopicPartition; ... remove the groupId property: // properties.setProperty(ConsumerConfig.GROUP_ID, groupId); and don't subscribe to topic // consumer.subscribe(Collection.singleton("first-topic")); // Arrays.asList("first-topic", "second-topic") // assign and seek are mostly used to replay data or fetch a specific message // assign TopicPartition partition = new TopicPartition(topic, 0); // partition 0 consumer.assign(Arrays.asList(partitionsToReadFrom)); // seek long offsetToReadFrom = 15L; consumer.seek(partitionsToReadFrom, offsetToReadFrom); // this consumer should read from these partitions and offsets int numberOfMessagesToRead = 5; boolean keepOnReading = true; int numberOfMessagesReadSoFar = 0; while(keepOnReading) { // java.time.Duration ConsumerRecords records = consumer.poll(Duration.ofMillis(100)); // new in Kafka 2.0 for (ConsumerRecord record : records) { numberOfMessagesToRead += 1; // record .key(), .value(), .partition(), .offset() if (numberOfMessagesReadSoFar >= numberOfMessagesToRead) { keepOnReading = false; // to exit while-loop break to exit for-loop } } } logger.info("Exitin the application"); https://www.linkedin.com/learning/learn-apache-kafka-for-beginners/client-bidirectional-compatibility // As of kafka 0.10.2 (July 2017) it has bi-directional compatibility - an OLDER client (ex 1.1) can talk to a newer broker (2.0) - a NEWER client (ex 2.0) can talk to an older broker (1.1) https://www.linkedin.com/learning/learn-apache-kafka-for-beginners/twitter-setup https://apps.twitter.com https://developer.twitter.com for personal use ... keys&tokens -> Consumer API keys (API key, API secret key) access token & access token secret https://github.com/twitter/hbc com.twitter:hbc-core https://www.linkedin.com/learning/learn-apache-kafka-for-beginners/producer-part-1-writing-a-twitter-client ... import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; public class TwitterProducer { private static Logger logger = LoggerFactory.getLogger(TwitterProducer.class); public TwitterProducer() { } public static void main(String[] args) { new TwitterProducer().run(); } public void run() { // create a twitter client BlockingQueue msgQueue = new LinkedBlockingQueue(100000); Client client = createTwitterClient(); // attempt to establish connection client.connect(); // create a kafka producer // loop to send tweets to kafka while (!client.isDone()) { try { String msg = msgQueue.poll(5, TimeUnit.SECONDS); } catch (InterruptedException e) { e.printStackTrace(); client.stop(); } if (msg != null) { logger.info(msg); } } logger.info("End of application"); } String concumerKey = ""; // ... public Client createTwitterClient(BlockingQueue msgQueue) { // ... Hosts hosebirdHosts = new HttpHosts(Constants.STREAM_HOST); StatusesFilterEndpoint hosebirdEndpoint = new StatusesFilterEndpoint(); List terms = Lists.newArrayList("kafka"); hosebirdEndpoint.trackTerms(terms); Authentication hosebirdAuth = new OAuth1(consumerKey, consumerSecret, token, tokenSecret); ClientBuilder builder = new ClientBuilder() .name("...") .hosts(hosebirdHosts) .authentication(hosebirdAuth) .endpoint(hosebirdEndpoint) .processor(new StringDelimitedProcessor(msgQueue)) Client hosebirdClient = builder.build(); return hosebirdClient } } https://www.linkedin.com/learning/learn-apache-kafka-for-beginners/producer-part-2-writing-the-kafka-producer ... public KafkaProducer createKafkaProducer() { Properties properties = new Properties(); properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "..."); properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); properties.setProperty(ProducerConfig.VALUEKEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); // KafkaProducer producer = new KafkaProducer(properties); return producer; } ... // create a kafka producer KafkaProducer producer = createKafkaProducer(); // add a shutdown hook Runtime.getRuntime().addShutdownHook(new Thread(() -> { logger.info("stopping twitter client"); client.stop(); logger.info("closing producer"); producer.stop(); logger.info("done"); })); // loop to send tweets to kafka // on a different thread, or multiple different threads... while (!client.isDone()) { String msg = null; try { msg = msgQueue.poll(5, TimeUnit.SECONDS); } catch (InterruptedException e) { e.printStackTrace(); client.stop(); } if (msg != null) { logger.info(msg); producer.send(new ProducerRecord<>("twitter_tweets", null, msg), new Callback() { // topic, key, value @Override public void onCompletion(RecordMetadata recordMetadata, Exception e) { if (e != null) { logger.error(e.getMessage(), e); } } }); } } $ kafka-topics --zookeper 127.0.0.1:2181 --create --topic tweeter_tweets --partitions 6 --replication-factor 1 ... created topic tweeter_tweets https://www.linkedin.com/learning/learn-apache-kafka-for-beginners/producer-configurations-introduction note: in the logs INFO org.apache.kafka.clients.producer.ProducerConfig - ProducerConfig values: acks = 1 batch.size = 16384 bootstrap.servers = [127.0.0.1:2181] buffer.memory = 33554432 client.id = compression.type = none enable.idempotence = false interceptor.classes = [] key.serializer = org.apacke.kafka.common.serialization.StringSerializer linger.ms = 0 max.request.size = value.seializer = https://www.linkedin.com/learning/learn-apache-kafka-for-beginners/acks-and-min-insync-replicas Producer Acks Deep Dive acks = 0 (no acks), useful if it's ok to potentially loose messages, nice for performance acks = 1 (leader acknoledgements) acks = all (leader + replicas), most guarantee, adds safety and latency must be used in conjunction with min.insync.replicas - can be set at the broker or topic level (override) min.insync.replicas = 2 (at least 2 brokers, incl the leader, must respond) Exception: NOT_ENOUGH_REPLICAS https://www.linkedin.com/learning/learn-apache-kafka-for-beginners/retries-and-max-in-flight-requests-per-connection Producer Retries in case of transient failures, developers are expected to handle exceptions, otherwise the data will be lost. Example of transient failure: * NotEnoughReplicasException Thre is a "retries" setting * defaults to 0 * You can increase even to an Integer.MAX_VALUE In case of retries, by default there is a chance that messages will be sent out of order (if a batch has failed to be sent) If you rely on key-based ordering, that can be an issue For this, you can set the setting while controls how many produce requests can be made in parallell (to any partition): max.in.flight.requests.per.connection * Defaults: 5 * set to 1 if you need to ensure ordering (may impact throughput) In Kafka >= 1.0.0 there is a better (and simpler one) solution https://www.linkedin.com/learning/learn-apache-kafka-for-beginners/idempotent-producer The Producer can introduce duplicate messages in Kafka due to the network errors If "ack" from kafka does not go back to producer due to the network errors, producer produce "retry" request and kafka commits duplicate In Kafka >= 0.11, you can define a "idempotent producer", which introduces "commit id" They (idempotent producers) come with * retries = Integer.MAX_VALUE (2^31 - 1) * max.in.flight.requests = 1 (kafka >= 0.11 || kafka < 1.1) * max.in.flight.requests = 5 (kafka >= 1.1 - higher performance) * acks = all just set: producerProps.put("enable.idempotence", "true") https://www.linkedin.com/learning/learn-apache-kafka-for-beginners/producer-part-3-safe-producer kafka < 0.11 * acks = all * min.insync.replicas = 2 (broker/topic level) ensures two brokers in ISR at least have the data after an ack * retry = MAX_INT (producer level) ensures transient errors are retried indefinitely * max.in.flight.requests.per.connection = 1 (producer level) kafka >= 0.11 * enable.idempotence = true (producer level) + min.insync.replicas = 2 (broker/topic level) implies acks = all, retry = MAX_INT, max.in.flight.requests = 5 (default) while keeping ordering guarantees and improving performance! and also don't get duplicates !!! running idempotence producer impact througput and latency, always test ... properties.setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true"); properties.setProperty(ProducerConfig.ACKS_CONFIG, "all"); properties.setProperty(ProducerConfig.RETRIES_CONFIG, Integer.toString(Integer.MAX_VALUE); properties.setProperty(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "5"); // for kafka >= 1.1 https://www.linkedin.com/learning/learn-apache-kafka-for-beginners/producer-compression compression.type * note (default) * gzip * lz4 * snappy it is more effective the bigger the batch of message being sent to Kafka * benchmark here (https://blog.coudflare.com/squeezing-the-firehose) https://www.linkedin.com/learning/learn-apache-kafka-for-beginners/producer-batching ... by default, kafka tries to send data as soon as possible * It will have up to 5 requests in flight, meaning up to 5 messages individually sent at the same time * After this, if more messages have to be sent while others are in flight, Kafka is smart and will start batching them while they wait to send them all at once This smart batching allows Kafka to increase throughput while maintaining very low latency Batches have higher compression ratio so better efficiency all messages in a batch are sent in one request How to control batching? * linger.ms - number of ms, a producer is willing to wait before sending a batch out (default 0) by introducing some lag (for example linger.ms=5) we increase chances of messages being sent togeather in a batch * if batch is full (see batch.size) before the end of the linger.ms period, it will be sent to Kafka right away the default is 16Kb (can increase to 32/64) any message bigger will not be batched A batch is allocated per partition Note: you can monitor the average batch size metric using * Kafka Producer Metrics https://www.linkedin.com/learning/learn-apache-kafka-for-beginners/producer-part-4-high-throughput-producer * add "snappy" for mainly text-data * batch size to 32Kb * linger.ms to 20ms properties.setProperty(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy"); properties.setProperty(ProducerConfig.LINGER_MS_CONFIG, "20"); properties.setProperty(ProducerConfig.BATCH_SIZE_CONFIG, Integer.toString(32*1024)); https://www.linkedin.com/learning/learn-apache-kafka-for-beginners/producer-default-partitions-and-key-hashing by default, keys are hashed using the * murmur2 It is possible to override the behavior of the partitioner * partitioner.class The formula is: * targetPartition = Utils.abs(Utils.murmur2(record.key())) % numPartitions; If the producer produces faster than the broker can take, the recors will be buffered in memory * buffer.memory = 33554432 (32Mb): the size of the send buffer If the buffer is full (32Mb), then the .send() method will start to block (won't return right away) * Max.block.ms = 60000 (1 min) - the time .send() will block untill throwing an exception for couple of reasons * the producer has filled up its buffer * the broker is not accepting any new data * 60 seconds has elapsed https://www.linkedin.com/learning/learn-apache-kafka-for-beginners/refactoring-the-project create maven modules: * kafka-basics * kafka-producer-twitter https://www.linkedin.com/learning/learn-apache-kafka-for-beginners/setting-up-elasticsearch-in-the-cloud https://bonsai.io/ creds, interactiv console https://www.linkedin.com/learning/learn-apache-kafka-for-beginners/elasticsearch-101 https://www.elastic.co/guide/en/elasticsearch/reference/current/_cluster_health.html /_cat/health?v /_cat/nodes?v /_cat/indices?v PUT /twitter {} /twitter/tweets/1 { "course": "Kafka for beg...", "instructor": "Stephane Maarek", "module": "ElasticSearch" } GET /twitter/tweets/1 { "_index": "twitter", "_type": "tweets", "_id": 1, "_version": 2, "found": true, "_source": { "course": "Kafka for beg...", "instructor": "Stephane Maarek", "module": "ElasticSearch" } } DELETE /twitter https://www.linkedin.com/learning/learn-apache-kafka-for-beginners/consumer-part-1-set-up-project https://www.elastic.co/guide/en/elasticsearch/client/java-rest/current/java-rest-high-getting-started-maven.html dep: org.elasticsearch.client:elasticsearch-rest-high-level-client:6.4.0 public class ElasticSearchConsumer { public static RestHighLevelClient createClient() { // https://:@ String hostname = ""; String username = ""; String password = ""; // !!! not for the case of local ES final CredentialsProvider credentialsProvider = new BasicCredentialsProvider(); credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(username, password)); RestClientBuilder builder = RestClient.builder( new HttpHost(hostname, 443, "https")) .setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() { @Override public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) { return httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider); } }); RestHighLevelClient client = new RestHighLevelClient(builder); return client; } public static void main(String[] args) throws IOException { RestHighLevelClient client = createClient(); String jsonString = "{\"foo\": \"bar\"}" IndexRequest indexRequest = new IndexRequest("twitter", "tweets") .source(jsonString, XContentType.JSON); // IOException IndexResponse indexResponse = client.index(indexRequest, RequestOptions.DEFAULT); String id = indexResponse.getId(); logger.info(id); client.close(); } } https://www.linkedin.com/learning/learn-apache-kafka-for-beginners/consumer-part-2-write-the-consumer-and-send-to-elasticsearch ... // String topic = "twitter_tweets"; public static KafkaConsumer createConsumer(String topic) { ... String groupId = "kafka-demo-elasticsearch"; ... properties.setProperty(ConsumerConfig.GROUP_ID, groupId); ... // "earliest" is equivalent to "--from-beginning" CLI option properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest|latest|none"); ... KafkaConsumer consumer = new KafkaConsumer(properties); consumer.subscribe(Arrays.asList(topic)); return consumer; } ... public static void main(String[] args) { ... while(true) { // java.time.Duration ConsumerRecords records = consumer.poll(Duration.ofMillis(100)); for (ConsumerRecord record : records) { // record .key(), .value(), .partition(), .offset() String jsonString = record.value(); IndexRequest indexRequest = new IndexRequest( "twitter", // indexer "tweets" // type ).source(jsonString, XContentType.JSON); IndexResponse indexResponse = client.index(indexRequest, RequestOptions.DEFAULT); String id = indexResponse.getId(); logger.info(id); Thread.sleep(1000); // just for demo } } // client.close(); } https://www.linkedin.com/learning/learn-apache-kafka-for-beginners/consumer-part-3-delivery-semantics // making it idempotent by tracking id ... IndexRequest indexRequest = new IndexRequest( "twitter", // indexer "tweets", // type id // !!! )... ... // 2 strategies // // 1. kafka generic id // String id = record.topic() + "_" + record.partition() + "_" + record.offset(); // // 2. twitter specific id String id = extractIdFromTweet(record.value()); IndexRequest indexRequest = new IndexRequest( "twitter", // indexer "tweets", // type id // this is to make consumer idempotent ).source(jsonString, XContentType.JSON); ... // google gson private static JsonParser jsonParser = new JsonParser(); private static String extractIdFromTweet(String tweetJson) { return jsonParser.parse(tweetJson) .getAsJsonObject() .get("id_str") .getAsString(); } https://www.linkedin.com/learning/learn-apache-kafka-for-beginners/delivery-semantics-for-consumers At most once offsets are committed as soon as the message batch is received. If the process goes wrong, the message will be lost (it won't be read again) At least once offsets are committed after the message is processed if the processing goes wrong, the message will be read again This can result in duplicate processing of messages. Make sure your processing is idempotent Exactly once only for Kafka => Kafka workflows using Kafka Streams API for Kafka => Sink workflows, use an idempotent consumer https://www.linkedin.com/learning/learn-apache-kafka-for-beginners/delivery-semantics-for-consumers 1:00 ... properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest|latest|none");