зеркало из
				https://github.com/iharh/notes.git
				synced 2025-10-31 05:36:08 +02:00 
			
		
		
		
	
		
			
				
	
	
		
			890 строки
		
	
	
		
			41 KiB
		
	
	
	
		
			Plaintext
		
	
	
	
	
	
			
		
		
	
	
			890 строки
		
	
	
		
			41 KiB
		
	
	
	
		
			Plaintext
		
	
	
	
	
	
| 2022
 | |
| Udemy - Apache Kafka Series - Kafka Streams for Data Processing
 | |
|     https://www.udemy.com/course/kafka-streams/
 | |
|     magnet:?xt=urn:btih:302b50a873ad4f997158ecad2f79f4f3e92c19f0
 | |
|     !!! 895m
 | |
| Apache Kafka Series - Learn Apache Kafka for Beginners v3
 | |
|     https://www.udemy.com/course/apache-kafka/
 | |
|     https://www.oreilly.com/library/view/apache-kafka-series/9781789342604/
 | |
| Udemy - Maarek - Apache Kafka Series - Learn Apache Kafka for Beginners v3
 | |
|     magnet:?xt=urn:btih:b9bd6bbfb5193f69128c6c34db379821a58d4de2
 | |
|     magnet:?xt=urn:btih:1485c91b36e8d099640dc6180dc42fd0e4a8997e
 | |
|     magnet:?xt=urn:btih:6f08ccb2d05668fb929dded3c8a5744d5976e7e1
 | |
|     https://freecoursesite.com/apache-kafka-series-learn-apache-kafka-for-beginners-v2-37/
 | |
|     https://course-downloader.com/apache-kafka-series-learn-apache-kafka-for-beginners-v2-4463
 | |
|     ! 3.11g
 | |
| ????
 | |
| Maarek - Apache Kafka Series – Learn Apache Kafka for Beginners v2
 | |
|     magnet:?xt=urn:btih:d18a88e411d34c627db87450b5631eeaed6cc193
 | |
|     magnet:?xt=urn:btih:b2f4b134238fb62b1cb06182178c348efe550cb5
 | |
|     magnet:?xt=urn:btih:ab0be03d946ff2cf02d1f078ec1bcb43f6a884f0
 | |
|     magnet:?xt=urn:btih:47dd77a975720aa74d108a7be5ef0ef100a8f55b
 | |
|     magnet:?xt=urn:btih:15b04ee6ab7bc26708ff8d59f65526bd13d822e4
 | |
|     magnet:?xt=urn:btih:e67fcb091584d050ba794c51e50cf3360682149d
 | |
|     magnet:?xt=urn:btih:f33d7bf93e72baf68e50bb5c0cdb055d395729fe
 | |
|     magnet:?xt=urn:btih:96e54381ee4af7b2c40e81c50143e94b759a99fa
 | |
|     https://tutsnode.net/apache-kafka-series-learn-apache-kafka-for-beginners-v2/
 | |
|     https://course-downloader.com/apache-kafka-series-learn-apache-kafka-for-beginners-v2-4425
 | |
|     https://github.com/conduktor/kafka-beginners-course
 | |
|     https://github.com/conduktor/kafka-stack-docker-compose
 | |
|     https://raw.githubusercontent.com/conduktor/kafka-stack-docker-compose/master/zk-single-kafka-single.yml
 | |
|     https://github.com/conduktor/conduktor-platform
 | |
|     https://github.com/conduktor/conduktor-platform/tree/main/example-local
 | |
|     ! 3.16-10g
 | |
| Udemy - Apache Kafka Series - Confluent Schema Registry & REST Proxy
 | |
|     https://www.udemy.com/course/confluent-schema-registry/
 | |
|     magnet:?xt=urn:btih:d67f45d8e3240f47a8efb5650b5b1ad46dde5ede
 | |
|     ! 884.6m, 4h30m, interesting docker-compose
 | |
|         landoop/fast-data-dev:cp3.3.0
 | |
| ????
 | |
| 2019
 | |
| Maarek - Learn Apache Kafka for Beginners
 | |
|     ! 7h32m
 | |
|     https://www.linkedin.com/learning/learn-apache-kafka-for-beginners/intro-to-apache-kafka
 | |
|     https://github.com/simplesteph
 | |
|     https://medium.com/@stephane.maarek
 | |
| TBD:
 | |
|     * Kafka Producer Metrics
 | |
|     https://www.datadoghq.com/blog/monitoring-kafka-performance-metrics/
 | |
|     https://medium.com/@aiiflo/kafka-producer-metrics-monitoring-key-metrics-aa25bafefcea
 | |
|     https://docs.confluent.io/platform/current/kafka/monitoring.html#global-request-metrics
 | |
|         batch-size-avg
 | |
| 
 | |
| theory
 | |
|     broker (each broker is also a bootstrap server)
 | |
|         has an integer ID
 | |
|         contain certain topic partitions, but not all
 | |
|         after connecting to any broker (called a bootstrap broker) - connects to the entire cluster
 | |
|         knows all the metadata (which topic, partitions are there at the cluster)
 | |
|     topics
 | |
|         is a particular stream of data
 | |
|     partitions
 | |
|         is ordered, order guaranteed only within partition
 | |
|         data is kept only for some limited amount of time (1 week)
 | |
|         data is immutable
 | |
|     offsets
 | |
|         incremental id (infinite)
 | |
|     replication factory
 | |
|         3 is usual, 2 - dangerous
 | |
|     leader
 | |
|         at any time only ONE broker can be a Leader for a given partition
 | |
|         only that leader can receive and serve data for a partition
 | |
|         the other brokers will synchronize data - ISR (in-sync replica)
 | |
|     send-mode, ACK
 | |
|         producer can choose to receive ACK for data-writes ()
 | |
|         acks=0  , no ack, dangerous
 | |
|             =1  , wait only for Leader to ack, limited data loss
 | |
|             =all, wait for all replicas, no data lost
 | |
|     key of the message
 | |
|         key=null -> round-robin
 | |
|         all the messages with the same key go to the same partition
 | |
|     consumers
 | |
|         read data in order "within each partition"
 | |
|     consumer-group
 | |
|         each consumer within a group reads from exclusive partitions
 | |
|         in case of number of consumer is greater than partitions, some consumers will be inactive
 | |
|         each group has "generation" and rebalance consumers upon adding/removing consumers in a group
 | |
|         !!! by using a new fresh consumer group, it is possible to read all the messages "again"
 | |
|     consumer-offsets
 | |
|         kafka stores the offsets at which a consumer group hass been reading
 | |
|         these offsets are committed live in a Kafka topic "__consumer_offsets"
 | |
|         these imply "delivery semantics" notion
 | |
|     delivery semantics
 | |
|         - at most once (offset is committed as soon as the message is received)
 | |
|         - at least once (offset is committed only after message has been processed)
 | |
|             processing system need to be idempotent in this case
 | |
|         - exactly once, can be achieved for Kafka=>(Kafka Workflows) using Stream API
 | |
|     zookeeper
 | |
|         in PROD operates with an odd number of zookeeper-servers (3, 5, 7)
 | |
|         one is Leader, others - Followers
 | |
|         zk does not store consumer offsets anymore starting from Kafka > v0.10
 | |
| 
 | |
| 
 | |
|     https://www.linkedin.com/learning/learn-apache-kafka-for-beginners/intro-to-apache-kafka
 | |
|     https://www.linkedin.com/learning/learn-apache-kafka-for-beginners/apache-kafka-in-five-minutes
 | |
|     https://www.linkedin.com/learning/learn-apache-kafka-for-beginners/kafka-theory-overview
 | |
|     https://www.linkedin.com/learning/learn-apache-kafka-for-beginners/topics-partitions-and-offsets
 | |
|     https://www.linkedin.com/learning/learn-apache-kafka-for-beginners/brokers-and-topics
 | |
|     https://www.linkedin.com/learning/learn-apache-kafka-for-beginners/topic-replication
 | |
|     https://www.linkedin.com/learning/learn-apache-kafka-for-beginners/producers-and-message-keys
 | |
|     https://www.linkedin.com/learning/learn-apache-kafka-for-beginners/consumer-and-consumer-group
 | |
|     https://www.linkedin.com/learning/learn-apache-kafka-for-beginners/consumer-offsets-and-delivery-semantics
 | |
|     https://www.linkedin.com/learning/learn-apache-kafka-for-beginners/kafka-broker-discovery
 | |
|     https://www.linkedin.com/learning/learn-apache-kafka-for-beginners/zookeeper
 | |
|     https://www.linkedin.com/learning/learn-apache-kafka-for-beginners/kafka-guarantees
 | |
|     https://www.linkedin.com/learning/learn-apache-kafka-for-beginners/macos-download-and-set-up-kafka-in-path
 | |
|     ...
 | |
|     https://www.linkedin.com/learning/learn-apache-kafka-for-beginners/kafka-topics-cli
 | |
|         kafka-topics
 | |
|             common-stuff:
 | |
|                 --zookeeper <host>:2181
 | |
| 
 | |
|             --list
 | |
|                 list all existing topics
 | |
|             --create
 | |
|                 --topic <topic-name>
 | |
|                 --partitions 3
 | |
|                 --replication-factor 2 (need to be more than the number of available brokers)
 | |
|             --describe
 | |
|                 good table of partitions
 | |
|             --delete
 | |
|                 mark a topic for deletion (does not delete it right away)
 | |
|                 (delete.topic.enable=true)
 | |
|     https://www.linkedin.com/learning/learn-apache-kafka-for-beginners/kafka-console-producer-cli
 | |
|         kafka-console-producer
 | |
|             common-stuff:
 | |
|                 --broker-list <host>:9192
 | |
|             --producer-property
 | |
|                 acks=all
 | |
|             --topic <topic-name>
 | |
|                 line-by-line till Ctrl-C
 | |
|                 Note: new topic is created if needed (LEADER_NOT_AVAILABLE WARN)
 | |
|                       with default num of partitions and repl-factor
 | |
|         config/server.properties
 | |
|             num.partitions=1 # default
 | |
|     https://www.linkedin.com/learning/learn-apache-kafka-for-beginners/kafka-console-consumer-cli
 | |
|         kafka-console-consumer
 | |
|             common-stuff:
 | |
|                 --bootstrap-server <host>:9192
 | |
|             --topic <topic-name>
 | |
|             --from-beginning 
 | |
|                 !!! by default, this consumer only reads new messages (from the start moment)
 | |
|             ...
 | |
|     https://www.linkedin.com/learning/learn-apache-kafka-for-beginners/kafka-consumers-in-groups
 | |
|         kafka-console-consumer
 | |
|             ...
 | |
|             --group <consumer-group-name>
 | |
|                 to be part of a consumer group
 | |
|     https://www.linkedin.com/learning/learn-apache-kafka-for-beginners/kafka-consumer-groups-cli 
 | |
|         kafka-consumer-groups
 | |
|             common-stuff:
 | |
|                 --bootstrap-server <host>:9192
 | |
|             --list
 | |
|                 ... console-consumer-<id>
 | |
|             --describe <group-name>
 | |
|                 ... LAG 0 -> meaning console conumer read all the data
 | |
|             ...
 | |
|     https://www.linkedin.com/learning/learn-apache-kafka-for-beginners/resetting-offsets
 | |
|         kafka-consumer-groups
 | |
|             ...
 | |
|             --reset-offsets
 | |
|                 --dry-run|--execute
 | |
|                 --by-period
 | |
|                 --to-earliest
 | |
|                 ... scopes...
 | |
|                 --all-topics
 | |
|                 --topic <topic-name>
 | |
|             --shift-by <num>
 | |
|             --describe
 | |
|                 --group <group-name>
 | |
|     https://www.linkedin.com/learning/learn-apache-kafka-for-beginners/kafka-tool-ui
 | |
|         www.kafkatool.com
 | |
| 
 | |
|     #
 | |
|     # Java stuff
 | |
|     #
 | |
|         https://kafka.apache.org/documentation/
 | |
| 
 | |
|     https://www.linkedin.com/learning/learn-apache-kafka-for-beginners/creating-a-kafka-project
 | |
|         mvn deps: org.apache.kafka:kafka-clients
 | |
|     https://www.linkedin.com/learning/learn-apache-kafka-for-beginners/java-producer
 | |
|         https://kafka.apache.org/documentation/#producerapi
 | |
| 
 | |
|         import org.apache.kafka.clients.producer.KafkaProducer;
 | |
|         import org.apache.kafka.clients.producer.ProducerConfig;
 | |
|         import org.apache.kafka.common.serialization.StringSerializer;
 | |
| 
 | |
|         Properties properties = new Properties();
 | |
|         // bootstrap.servers
 | |
|         properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "...");
 | |
|         // key.serializer
 | |
|         properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
 | |
|         // value.serializer
 | |
|         properties.setProperty(ProducerConfig.VALUEKEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
 | |
| 
 | |
|         // <K, V>
 | |
|         KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties);
 | |
| 
 | |
|         // key and other stuff can be here as well
 | |
|         ProducerRecord<String, String> record = new ProducerRecord<String, String>("first-topic", "some-value"); 
 | |
| 
 | |
|         // this is async !!!
 | |
|         producer.send(record);
 | |
| 
 | |
|         // to actually send
 | |
|         producer.flush();
 | |
| 
 | |
|         // flush and close
 | |
|         producer.close();
 | |
| 
 | |
|     https://www.linkedin.com/learning/learn-apache-kafka-for-beginners/java-producer-callbacks
 | |
|         ...
 | |
|         import org.apache.kafka.clients.producer.Callback;
 | |
|         ...
 | |
|         // set data asynchronously
 | |
|         producer.send(record, new Callback() {
 | |
|             public void onCompletion(RecordMetadata recordMetadata, Exception e) {
 | |
|                 // executes every time record is successfuly sent or an exception is thrown
 | |
|                 if (e == null) {
 | |
|                     // the record was successfully sent
 | |
|                     // log recordMetadata.topic(), .partition(), .offset(), .timestamp()
 | |
|                 } else {
 | |
|                 }
 | |
|             }
 | |
|         });
 | |
| 
 | |
|     https://www.linkedin.com/learning/learn-apache-kafka-for-beginners/java-producer-with-keys
 | |
|         ...
 | |
|         ProducerRecord<String, String> record = new ProducerRecord<String, String>("first-topic", "some-key", some-value"); 
 | |
|         ...
 | |
|         // set data asynchronously
 | |
|         producer.send(record, new Callback() {
 | |
|             public void onCompletion(RecordMetadata recordMetadata, Exception e) {
 | |
|                 // executes every time record is successfuly sent or an exception is thrown
 | |
|                 if (e == null) {
 | |
|                     // the record was successfully sent
 | |
|                     // log recordMetadata.topic(), .partition(), .offset(), .timestamp()
 | |
|                 } else {
 | |
|                 }
 | |
|             }
 | |
|         }).get(); // block the .send() to make it synchronous. Don't do it in PROD...
 | |
|           // throws ExecutionException, InterruptedException
 | |
| 
 | |
|     https://www.linkedin.com/learning/learn-apache-kafka-for-beginners/java-consumer
 | |
|         import org.apache.kafka.clients.producer.ConsumerConfig;
 | |
|         import org.apache.kafka.common.serialization.StringDeserializer;
 | |
| 
 | |
|         Properties properties = new Properties();
 | |
|         // bootstrap.servers
 | |
|         properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "...");
 | |
|         // key.deserializer
 | |
|         properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
 | |
|         // value.deserializer
 | |
|         properties.setProperty(ConsumerConfig.VALUEKEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
 | |
|         // groupId
 | |
|         properties.setProperty(ConsumerConfig.GROUP_ID, "my-fourth-app");
 | |
|         // auto.offset.reset
 | |
|         // "earliest" is equivalent to "--from-beginning" CLI option
 | |
|         properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest|latest|none");
 | |
| 
 | |
|         KafkaConsumer<String, Strring> consumer = new KafkaConsumer<String, String>(properties);
 | |
| 
 | |
|         consumer.subscribe(Collection.singleton("first-topic")); // Arrays.asList("first-topic", "second-topic")
 | |
| 
 | |
|         while(true) {
 | |
|             // java.time.Duration
 | |
|             ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); // deprecated
 | |
|             for (ConsumerRecord<String, String> record : records) {
 | |
|                 // record .key(), .value(), .partition(), .offset()
 | |
|             }
 | |
|         }
 | |
| 
 | |
|     https://www.linkedin.com/learning/learn-apache-kafka-for-beginners/java-consumer-inside-consumer-group
 | |
|         ...
 | |
|         groupId = "my-fifth-application"
 | |
|         ...
 | |
|         // !!! because we started 2-nd consumer, the group started rebalancing
 | |
|         log:
 | |
|         INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-1, group-id=my-fifth-application]
 | |
|             Attempt to heartbeat failed since group is rebalancing
 | |
|         INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-1, group-id=my-fifth-application]
 | |
|             Revoking previously assigned partitions [first_topic-0, first_topic-1, first_topic-2]
 | |
|         INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-1, group-id=my-fifth-application]
 | |
|             (Re-)joining group
 | |
|         INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-1, group-id=my-fifth-application]
 | |
|             Successfully joined group with generation 2
 | |
|         INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-1, group-id=my-fifth-application]
 | |
|             Setting newly assigned partitions [first_topic-0, first_topic-1]
 | |
| 
 | |
|     https://www.linkedin.com/learning/learn-apache-kafka-for-beginners/java-consumer-with-threads
 | |
|         // let's get rid of the ugly while(true) loop
 | |
|         import java.util.concurrent.CountDownLatch;
 | |
|         ...
 | |
|         public class ConsumerDemoWithThread {
 | |
| 
 | |
|             public static void main(String[] args) {
 | |
|                 new ConsumerDemoWithThread().run();
 | |
|             }
 | |
| 
 | |
|             private ConsumerDemoWithThread() {
 | |
|             }
 | |
| 
 | |
|             private void run() {
 | |
|                 Logger logger = LoggerFactory.getLogger(ConsumerDemoWithThread.class);
 | |
|                 ...
 | |
|                 groupId = "my-sixth-application"
 | |
| 
 | |
|                 CountDownLatch latch = new CountDownLatch(1);
 | |
| 
 | |
|                 Runnable myConsumerRunnable = new ConsumerRunnable(..., latch);
 | |
| 
 | |
|                 Thread myThread = new Thread(myConsumerRunnable);
 | |
|                 myThread.start();
 | |
| 
 | |
|                 Runtime.getRuntime().addShutdownHook(new Thread( () -> {
 | |
|                     logger.info("Caught shutdown hook");
 | |
|                     ((ConsumerRunnable) myConsumerRunnable).shutdown();
 | |
|                     try {
 | |
|                         latch.await();
 | |
|                     } catch (InterruptedException e) {
 | |
|                         e.printStackTrace();
 | |
|                     }
 | |
|                     logger.info("Application has exited");
 | |
|                 }))
 | |
| 
 | |
|                 try {
 | |
|                     latch.await();
 | |
|                 } catch() {
 | |
|                     logger.error("Application is interrupting", e);
 | |
|                 } finally {
 | |
|                     logger.info("Application is closing");
 | |
|                 }
 | |
|             }
 | |
| 
 | |
|             public class ConsumerThread extends Runnable {
 | |
|                 private static Logger logger = LoggerFactory.getLogger(ConsumerThread.class);
 | |
| 
 | |
|                 private CountDownLatch latch;
 | |
|                 private KafkaConsumer<String, String> consumer;
 | |
| 
 | |
|                 public ConsumerThread(String bootstrapServers, String groupId, String topic, CountDownLatch latch) {
 | |
|                     this.latch = latch;
 | |
| 
 | |
|                     Properties properties = new Properties();
 | |
|                     // bootstrap.servers
 | |
|                     properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
 | |
|                     // key.deserializer
 | |
|                     properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
 | |
|                     // value.deserializer
 | |
|                     properties.setProperty(ConsumerConfig.VALUEKEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
 | |
|                     // groupId
 | |
|                     properties.setProperty(ConsumerConfig.GROUP_ID, groupId);
 | |
|                     // auto.offset.reset
 | |
|                     // "earliest" is equivalent to "--from-beginning" CLI option
 | |
|                     properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest|latest|none");
 | |
| 
 | |
|                     consumer = new KafkaConsumer<String, String>(properties);
 | |
|                 }
 | |
| 
 | |
|                 @Override
 | |
|                 public void run() {
 | |
|                     try {
 | |
|                         while(true) {
 | |
|                             // java.time.Duration
 | |
|                             ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); // deprecated
 | |
|                             for (ConsumerRecord<String, String> record : records) {
 | |
|                                 // record .key(), .value(), .partition(), .offset()
 | |
|                             }
 | |
|                         }
 | |
|                     } catch (WakeupException e) {
 | |
|                         logger.info("received shutdown exception");
 | |
|                     } finally {
 | |
|                         // super-important!
 | |
|                         consumer.close();
 | |
|                         // tell our main thread that we're done with the consumer
 | |
|                         latch.countDown();
 | |
|                     }
 | |
|                 }
 | |
| 
 | |
|                 public void shutdown() {
 | |
|                     // a special method to interrupt consumer.poll()
 | |
|                     // it will throw WakeupException
 | |
|                     consumer.wakeup();
 | |
|                 }
 | |
|             }
 | |
|         
 | |
|     https://www.linkedin.com/learning/learn-apache-kafka-for-beginners/java-consumer-seek-and-assign
 | |
|         // assign and seek
 | |
|         import org.apache.kafka.common.TopicPartition;
 | |
|         ...
 | |
|         remove the groupId property:
 | |
|         // properties.setProperty(ConsumerConfig.GROUP_ID, groupId);
 | |
|         and don't subscribe to topic
 | |
|         // consumer.subscribe(Collection.singleton("first-topic")); // Arrays.asList("first-topic", "second-topic")
 | |
| 
 | |
|         // assign and seek are mostly used to replay data or fetch a specific message
 | |
| 
 | |
|         // assign
 | |
|         TopicPartition partition = new TopicPartition(topic, 0); // partition 0
 | |
|         consumer.assign(Arrays.asList(partitionsToReadFrom));
 | |
| 
 | |
|         // seek
 | |
|         long offsetToReadFrom = 15L;
 | |
|         consumer.seek(partitionsToReadFrom, offsetToReadFrom); // this consumer should read from these partitions and offsets
 | |
| 
 | |
|         int numberOfMessagesToRead = 5;
 | |
|         boolean keepOnReading = true;
 | |
|         int numberOfMessagesReadSoFar = 0;
 | |
| 
 | |
|         while(keepOnReading) {
 | |
|             // java.time.Duration
 | |
|             ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); // new in Kafka 2.0
 | |
|             for (ConsumerRecord<String, String> record : records) {
 | |
|                 numberOfMessagesToRead += 1;
 | |
|                 // record .key(), .value(), .partition(), .offset()
 | |
|                 if (numberOfMessagesReadSoFar >= numberOfMessagesToRead) {
 | |
|                     keepOnReading = false; // to exit while-loop
 | |
|                     break to exit for-loop
 | |
|                 }
 | |
|             }
 | |
|         }
 | |
|         logger.info("Exitin the application");
 | |
| 
 | |
|     https://www.linkedin.com/learning/learn-apache-kafka-for-beginners/client-bidirectional-compatibility
 | |
|         // As of kafka 0.10.2 (July 2017) it has bi-directional compatibility
 | |
|         - an OLDER client (ex 1.1) can talk to a newer broker (2.0)
 | |
|         - a NEWER client (ex 2.0) can talk to an older broker (1.1)
 | |
| 
 | |
|     https://www.linkedin.com/learning/learn-apache-kafka-for-beginners/twitter-setup
 | |
|         https://apps.twitter.com
 | |
|         https://developer.twitter.com
 | |
|             for personal use
 | |
|         ...
 | |
|             keys&tokens -> Consumer API keys (API key, API secret key)
 | |
|             access token & access token secret
 | |
|         https://github.com/twitter/hbc
 | |
|             com.twitter:hbc-core
 | |
| 
 | |
|     https://www.linkedin.com/learning/learn-apache-kafka-for-beginners/producer-part-1-writing-a-twitter-client
 | |
|         ...
 | |
|         import java.util.concurrent.BlockingQueue;
 | |
|         import java.util.concurrent.LinkedBlockingQueue;
 | |
| 
 | |
|         public class TwitterProducer {
 | |
|             private static Logger logger = LoggerFactory.getLogger(TwitterProducer.class);
 | |
| 
 | |
|             public TwitterProducer() {
 | |
|             }
 | |
| 
 | |
|             public static void main(String[] args) {
 | |
|                 new TwitterProducer().run();
 | |
|             }
 | |
| 
 | |
|             public void run() {
 | |
|                 // create a twitter client
 | |
|                 BlockingQueue<String> msgQueue = new LinkedBlockingQueue<String>(100000);
 | |
|                 Client client = createTwitterClient();
 | |
| 
 | |
|                 // attempt to establish connection
 | |
|                 client.connect();
 | |
|                 
 | |
|                 // create a kafka producer
 | |
| 
 | |
|                 // loop to send tweets to kafka
 | |
|                 while (!client.isDone()) {
 | |
|                     try {
 | |
|                         String msg = msgQueue.poll(5, TimeUnit.SECONDS);
 | |
|                     } catch (InterruptedException e) {
 | |
|                         e.printStackTrace();
 | |
|                         client.stop();
 | |
|                     }
 | |
|                     if (msg != null) {
 | |
|                         logger.info(msg);
 | |
|                     }
 | |
|                 }
 | |
|                 logger.info("End of application");
 | |
|             }
 | |
| 
 | |
|             String concumerKey = "";
 | |
|             // ...
 | |
| 
 | |
|             public Client createTwitterClient(BlockingQueue<String> msgQueue) {
 | |
|                 // ...
 | |
|                 Hosts hosebirdHosts = new HttpHosts(Constants.STREAM_HOST);
 | |
|                 StatusesFilterEndpoint hosebirdEndpoint = new StatusesFilterEndpoint();
 | |
| 
 | |
|                 List<String> terms = Lists.newArrayList("kafka");
 | |
|                 hosebirdEndpoint.trackTerms(terms);
 | |
| 
 | |
|                 Authentication hosebirdAuth = new OAuth1(consumerKey, consumerSecret, token, tokenSecret);
 | |
| 
 | |
|                 ClientBuilder builder = new ClientBuilder()
 | |
|                     .name("...")
 | |
|                     .hosts(hosebirdHosts)
 | |
|                     .authentication(hosebirdAuth)
 | |
|                     .endpoint(hosebirdEndpoint)
 | |
|                     .processor(new StringDelimitedProcessor(msgQueue))
 | |
| 
 | |
|                 Client hosebirdClient = builder.build();
 | |
|                 return hosebirdClient
 | |
|             }
 | |
|         }
 | |
| 
 | |
|     https://www.linkedin.com/learning/learn-apache-kafka-for-beginners/producer-part-2-writing-the-kafka-producer
 | |
|         ...
 | |
|         public KafkaProducer<String, String> createKafkaProducer() {
 | |
|             Properties properties = new Properties();
 | |
|             properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "...");
 | |
|             properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
 | |
|             properties.setProperty(ProducerConfig.VALUEKEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
 | |
| 
 | |
|             // <K, V>
 | |
|             KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties);
 | |
|             return producer;
 | |
|         }
 | |
|         ...
 | |
|         // create a kafka producer
 | |
|         KafkaProducer<String, String> producer = createKafkaProducer();
 | |
| 
 | |
|         // add a shutdown hook
 | |
|         Runtime.getRuntime().addShutdownHook(new Thread(() -> {
 | |
|             logger.info("stopping twitter client");
 | |
|             client.stop();
 | |
|             logger.info("closing producer");
 | |
|             producer.stop();
 | |
|             logger.info("done");
 | |
|         }));
 | |
| 
 | |
|         // loop to send tweets to kafka
 | |
|         // on a different thread, or multiple different threads...
 | |
|         while (!client.isDone()) {
 | |
|             String msg = null;
 | |
|             try {
 | |
|                 msg = msgQueue.poll(5, TimeUnit.SECONDS);
 | |
|             } catch (InterruptedException e) {
 | |
|                 e.printStackTrace();
 | |
|                 client.stop();
 | |
|             }
 | |
|             if (msg != null) {
 | |
|                 logger.info(msg);
 | |
|                 producer.send(new ProducerRecord<>("twitter_tweets", null, msg), new Callback() { // topic, key, value
 | |
|                     @Override
 | |
|                     public void onCompletion(RecordMetadata recordMetadata, Exception e) {
 | |
|                         if (e != null) {
 | |
|                             logger.error(e.getMessage(), e);
 | |
|                         } 
 | |
|                     }
 | |
|                 }); 
 | |
|             }
 | |
|         }
 | |
| 
 | |
|         $ kafka-topics --zookeper 127.0.0.1:2181 --create --topic tweeter_tweets --partitions 6 --replication-factor 1
 | |
|           ... created topic tweeter_tweets
 | |
| 
 | |
|     https://www.linkedin.com/learning/learn-apache-kafka-for-beginners/producer-configurations-introduction
 | |
|         note: in the logs
 | |
|         INFO org.apache.kafka.clients.producer.ProducerConfig - ProducerConfig values:
 | |
|             acks = 1
 | |
|             batch.size = 16384
 | |
|             bootstrap.servers = [127.0.0.1:2181]
 | |
|             buffer.memory = 33554432
 | |
|             client.id =
 | |
|             compression.type = none
 | |
|             enable.idempotence = false
 | |
|             interceptor.classes = []
 | |
|             key.serializer = org.apacke.kafka.common.serialization.StringSerializer
 | |
|             linger.ms = 0
 | |
|             max.request.size = 
 | |
|             value.seializer = 
 | |
| 
 | |
|     https://www.linkedin.com/learning/learn-apache-kafka-for-beginners/acks-and-min-insync-replicas
 | |
|         Producer Acks Deep Dive
 | |
|             acks = 0 (no acks), useful if it's ok to potentially loose messages, nice for performance
 | |
|             acks = 1 (leader acknoledgements)
 | |
|             acks = all (leader + replicas), most guarantee, adds safety and latency
 | |
|                 must be used in conjunction with
 | |
|                     min.insync.replicas - can be set at the broker or topic level (override)
 | |
|                     min.insync.replicas = 2 (at least 2 brokers, incl the leader, must respond)
 | |
|                         Exception: NOT_ENOUGH_REPLICAS
 | |
| 
 | |
|     https://www.linkedin.com/learning/learn-apache-kafka-for-beginners/retries-and-max-in-flight-requests-per-connection
 | |
|         Producer Retries
 | |
|             in case of transient failures, developers are expected to handle exceptions,
 | |
|             otherwise the data will be lost.
 | |
|             Example of transient failure:
 | |
|                 * NotEnoughReplicasException
 | |
|             Thre is a "retries" setting
 | |
|                 * defaults to 0
 | |
|                 * You can increase even to an Integer.MAX_VALUE
 | |
|             In case of retries, by default there is a chance that messages will be sent out of order
 | |
|             (if a batch has failed to be sent)
 | |
|             If you rely on key-based ordering, that can be an issue
 | |
|             For this, you can set the setting while controls how many produce
 | |
|             requests can be made in parallell (to any partition):
 | |
|                 max.in.flight.requests.per.connection
 | |
|                     * Defaults: 5
 | |
|                     * set to 1 if you need to ensure ordering (may impact throughput)
 | |
|             In Kafka >= 1.0.0 there is a better (and simpler one) solution
 | |
| 
 | |
|     https://www.linkedin.com/learning/learn-apache-kafka-for-beginners/idempotent-producer
 | |
|         The Producer can introduce duplicate messages in Kafka due to the network errors
 | |
|         If "ack" from kafka does not go back to producer due to the network errors,
 | |
|         producer produce "retry" request and kafka commits duplicate
 | |
| 
 | |
|         In Kafka >= 0.11, you can define a "idempotent producer",
 | |
|         which introduces "commit id"
 | |
|            
 | |
|         They (idempotent producers) come with
 | |
|             * retries                = Integer.MAX_VALUE (2^31 - 1)
 | |
|             * max.in.flight.requests = 1 (kafka >= 0.11 || kafka < 1.1)
 | |
|             * max.in.flight.requests = 5 (kafka >= 1.1 - higher performance)
 | |
|             * acks = all
 | |
| 
 | |
|         just set:
 | |
|             producerProps.put("enable.idempotence", "true")
 | |
| 
 | |
|     https://www.linkedin.com/learning/learn-apache-kafka-for-beginners/producer-part-3-safe-producer
 | |
|         kafka < 0.11
 | |
|             * acks = all
 | |
|             * min.insync.replicas = 2 (broker/topic level)
 | |
|                 ensures two brokers in ISR at least have the data after an ack
 | |
|             * retry = MAX_INT (producer level)
 | |
|                 ensures transient errors are retried indefinitely
 | |
|             * max.in.flight.requests.per.connection = 1 (producer level)
 | |
| 
 | |
|         kafka >= 0.11
 | |
|             * enable.idempotence = true (producer level) + min.insync.replicas = 2 (broker/topic level)
 | |
|                 implies acks = all, retry = MAX_INT, max.in.flight.requests = 5 (default)
 | |
|                 while keeping ordering guarantees and improving performance!
 | |
|                 and also don't get duplicates
 | |
| 
 | |
|         !!! running idempotence producer impact througput and latency, always test
 | |
|         ...
 | |
|             properties.setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
 | |
|             properties.setProperty(ProducerConfig.ACKS_CONFIG, "all");
 | |
|             properties.setProperty(ProducerConfig.RETRIES_CONFIG, Integer.toString(Integer.MAX_VALUE);
 | |
|             properties.setProperty(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "5"); // for kafka >= 1.1
 | |
| 
 | |
|     https://www.linkedin.com/learning/learn-apache-kafka-for-beginners/producer-compression
 | |
|         compression.type
 | |
|             * note (default)
 | |
|             * gzip
 | |
|             * lz4
 | |
|             * snappy
 | |
| 
 | |
|         it is more effective the bigger the batch of message being sent to Kafka
 | |
|             * benchmark here (https://blog.coudflare.com/squeezing-the-firehose)
 | |
| 
 | |
|     https://www.linkedin.com/learning/learn-apache-kafka-for-beginners/producer-batching
 | |
|         ...
 | |
|         by default, kafka tries to send data as soon as possible
 | |
|             * It will have up to 5 requests in flight,
 | |
|               meaning up to 5 messages individually sent at the same time
 | |
|             * After this, if more messages have to be sent while others are in flight,
 | |
|               Kafka is smart and will start batching them while they wait
 | |
|               to send them all at once
 | |
|         This smart batching allows Kafka to increase throughput
 | |
|             while maintaining very low latency
 | |
|         Batches have higher compression ratio so better efficiency
 | |
|             all messages in a batch are sent in one request
 | |
| 
 | |
|         How to control batching?
 | |
|             * linger.ms - number of ms, a producer is willing to wait
 | |
|               before sending a batch out (default 0)
 | |
|               by introducing some lag (for example linger.ms=5) we increase
 | |
|               chances of messages being sent togeather in a batch
 | |
|             * if batch is full (see batch.size) before the end of the linger.ms period,
 | |
|               it will be sent to Kafka right away
 | |
|                   the default is 16Kb (can increase to 32/64)
 | |
|               any message bigger will not be batched
 | |
|               A batch is allocated per partition
 | |
|         Note: you can monitor the average batch size metric using
 | |
|             * Kafka Producer Metrics
 | |
| 
 | |
|     https://www.linkedin.com/learning/learn-apache-kafka-for-beginners/producer-part-4-high-throughput-producer
 | |
|         * add "snappy" for mainly text-data
 | |
|         * batch size to 32Kb
 | |
|         * linger.ms to 20ms
 | |
| 
 | |
|         properties.setProperty(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy");
 | |
|         properties.setProperty(ProducerConfig.LINGER_MS_CONFIG, "20");
 | |
|         properties.setProperty(ProducerConfig.BATCH_SIZE_CONFIG, Integer.toString(32*1024));
 | |
| 
 | |
|     https://www.linkedin.com/learning/learn-apache-kafka-for-beginners/producer-default-partitions-and-key-hashing
 | |
|         by default, keys are hashed using the
 | |
|             * murmur2
 | |
| 
 | |
|         It is possible to override the behavior of the partitioner
 | |
|             * partitioner.class
 | |
|         The formula is:
 | |
|             * targetPartition = Utils.abs(Utils.murmur2(record.key())) % numPartitions;
 | |
| 
 | |
|         If the producer produces faster than the broker can take,
 | |
|         the recors will be buffered in memory
 | |
|             * buffer.memory = 33554432 (32Mb): the size of the send buffer
 | |
| 
 | |
|         If the buffer is full (32Mb), then the .send() method will start to block
 | |
|         (won't return right away)
 | |
|             * Max.block.ms = 60000 (1 min) - the time .send() will block
 | |
|               untill throwing an exception for couple of reasons
 | |
|                   * the producer has filled up its buffer
 | |
|                   * the broker is not accepting any new data
 | |
|                   * 60 seconds has elapsed
 | |
| 
 | |
|     https://www.linkedin.com/learning/learn-apache-kafka-for-beginners/refactoring-the-project
 | |
|         create maven modules:
 | |
|             * kafka-basics
 | |
|             * kafka-producer-twitter
 | |
| 
 | |
|     https://www.linkedin.com/learning/learn-apache-kafka-for-beginners/setting-up-elasticsearch-in-the-cloud
 | |
|         https://bonsai.io/
 | |
|         creds, interactiv console
 | |
| 
 | |
|     https://www.linkedin.com/learning/learn-apache-kafka-for-beginners/elasticsearch-101
 | |
|         https://www.elastic.co/guide/en/elasticsearch/reference/current/_cluster_health.html
 | |
|             /_cat/health?v
 | |
|             /_cat/nodes?v
 | |
|             /_cat/indices?v
 | |
|         PUT
 | |
|             /twitter
 | |
|                 {}
 | |
|             /twitter/tweets/1
 | |
|                 {
 | |
|                   "course": "Kafka for beg...",
 | |
|                   "instructor": "Stephane Maarek",
 | |
|                   "module": "ElasticSearch"
 | |
|                 }
 | |
|         GET
 | |
|             /twitter/tweets/1
 | |
|             {
 | |
|                 "_index": "twitter",
 | |
|                 "_type": "tweets",
 | |
|                 "_id": 1,
 | |
|                 "_version": 2,
 | |
|                 "found": true,
 | |
|                 "_source": {
 | |
|                   "course": "Kafka for beg...",
 | |
|                   "instructor": "Stephane Maarek",
 | |
|                   "module": "ElasticSearch"
 | |
|                 }
 | |
|             }
 | |
|         DELETE
 | |
|             /twitter
 | |
| 
 | |
|     https://www.linkedin.com/learning/learn-apache-kafka-for-beginners/consumer-part-1-set-up-project
 | |
|         https://www.elastic.co/guide/en/elasticsearch/client/java-rest/current/java-rest-high-getting-started-maven.html
 | |
|             dep: org.elasticsearch.client:elasticsearch-rest-high-level-client:6.4.0
 | |
| 
 | |
|         public class ElasticSearchConsumer {
 | |
|             public static RestHighLevelClient createClient() {
 | |
|                 // https://<username>:<password>@<hostname>
 | |
|                 String hostname = "";
 | |
|                 String username = "";
 | |
|                 String password = "";
 | |
|                 // !!! not for the case of local ES
 | |
|                 final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
 | |
|                 credentialsProvider.setCredentials(AuthScope.ANY,
 | |
|                       new UsernamePasswordCredentials(username, password));
 | |
|                 RestClientBuilder builder = RestClient.builder(
 | |
|                       new HttpHost(hostname, 443, "https"))
 | |
|                       .setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
 | |
|                           @Override
 | |
|                           public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) {
 | |
|                               return httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
 | |
|                           }
 | |
|                       });
 | |
| 
 | |
|                 RestHighLevelClient client = new RestHighLevelClient(builder);
 | |
|                 return client;
 | |
|             }
 | |
| 
 | |
|             public static void main(String[] args) throws IOException {
 | |
|                 RestHighLevelClient client = createClient();
 | |
|                 String jsonString = "{\"foo\": \"bar\"}"
 | |
|                 IndexRequest indexRequest = new IndexRequest("twitter", "tweets")
 | |
|                     .source(jsonString, XContentType.JSON);
 | |
|                 // IOException
 | |
|                 IndexResponse indexResponse = client.index(indexRequest, RequestOptions.DEFAULT);
 | |
|                 String id = indexResponse.getId();
 | |
|                 logger.info(id);
 | |
|                 client.close();
 | |
|             }
 | |
|         }
 | |
| 
 | |
|     https://www.linkedin.com/learning/learn-apache-kafka-for-beginners/consumer-part-2-write-the-consumer-and-send-to-elasticsearch
 | |
|         ...
 | |
|         // String topic   = "twitter_tweets";
 | |
|         public static KafkaConsumer<String, String> createConsumer(String topic) {
 | |
|             ...
 | |
|             String groupId = "kafka-demo-elasticsearch";
 | |
|             ...
 | |
|             properties.setProperty(ConsumerConfig.GROUP_ID, groupId);
 | |
|             ...
 | |
|             // "earliest" is equivalent to "--from-beginning" CLI option
 | |
|             properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest|latest|none");
 | |
|             ...
 | |
|             KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
 | |
|             consumer.subscribe(Arrays.asList(topic));
 | |
|             return consumer;
 | |
|         }
 | |
|         ...
 | |
|         public static void main(String[] args) {
 | |
|             ...
 | |
|             while(true) {
 | |
|                 // java.time.Duration
 | |
|                 ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
 | |
|                 for (ConsumerRecord<String, String> record : records) {
 | |
|                     // record .key(), .value(), .partition(), .offset()
 | |
| 
 | |
|                     String jsonString = record.value();
 | |
| 
 | |
|                     IndexRequest indexRequest = new IndexRequest(
 | |
|                         "twitter", // indexer
 | |
|                         "tweets"   // type
 | |
|                     ).source(jsonString, XContentType.JSON);
 | |
| 
 | |
|                     IndexResponse indexResponse = client.index(indexRequest, RequestOptions.DEFAULT);
 | |
|                     String id = indexResponse.getId();
 | |
|                     logger.info(id);
 | |
|                     
 | |
|                     Thread.sleep(1000); // just for demo
 | |
|                 }
 | |
|             }
 | |
|             // client.close();
 | |
|         }
 | |
| 
 | |
|     https://www.linkedin.com/learning/learn-apache-kafka-for-beginners/consumer-part-3-delivery-semantics
 | |
|         // making it idempotent by tracking id
 | |
|         ...
 | |
|                     IndexRequest indexRequest = new IndexRequest(
 | |
|                         "twitter", // indexer
 | |
|                         "tweets",  // type
 | |
|                         id         // !!!
 | |
|                     )...
 | |
|         ...
 | |
|                     // 2 strategies
 | |
|                     //
 | |
|                     // 1. kafka generic id
 | |
|                     // String id = record.topic() + "_" + record.partition() + "_" + record.offset();
 | |
|                     //
 | |
|                     // 2. twitter specific id
 | |
|                     String id = extractIdFromTweet(record.value());
 | |
|                 
 | |
|                     IndexRequest indexRequest = new IndexRequest(
 | |
|                         "twitter", // indexer
 | |
|                         "tweets",  // type
 | |
|                         id         // this is to make consumer idempotent
 | |
|                     ).source(jsonString, XContentType.JSON);
 | |
|         ...
 | |
|             // google gson
 | |
|             private static JsonParser jsonParser = new JsonParser();
 | |
| 
 | |
|             private static String extractIdFromTweet(String tweetJson) {
 | |
|                 return jsonParser.parse(tweetJson)
 | |
|                     .getAsJsonObject()
 | |
|                     .get("id_str")
 | |
|                     .getAsString();
 | |
|             }
 | |
| 
 | |
|     https://www.linkedin.com/learning/learn-apache-kafka-for-beginners/delivery-semantics-for-consumers
 | |
|         At most once
 | |
|             offsets are committed as soon as the message batch is received.
 | |
|             If the process goes wrong, the message will be lost
 | |
|             (it won't be read again)
 | |
| 
 | |
|         At least once
 | |
|             offsets are committed after the message is processed
 | |
|             if the processing goes wrong, the message will be read again
 | |
|             This can result in duplicate processing of messages.
 | |
|             Make sure your processing is idempotent
 | |
|            
 | |
|         Exactly once
 | |
|             only for Kafka => Kafka workflows
 | |
|             using Kafka Streams API
 | |
|             for Kafka => Sink workflows, use an idempotent consumer
 | |
| 
 | |
|     https://www.linkedin.com/learning/learn-apache-kafka-for-beginners/delivery-semantics-for-consumers
 | |
|         1:00
 | |
|         ...
 | |
|         properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest|latest|none");
 | 
