зеркало из
				https://github.com/iharh/notes.git
				synced 2025-11-04 07:36:08 +02:00 
			
		
		
		
	
		
			
				
	
	
		
			890 строки
		
	
	
		
			41 KiB
		
	
	
	
		
			Plaintext
		
	
	
	
	
	
			
		
		
	
	
			890 строки
		
	
	
		
			41 KiB
		
	
	
	
		
			Plaintext
		
	
	
	
	
	
2022
 | 
						|
Udemy - Apache Kafka Series - Kafka Streams for Data Processing
 | 
						|
    https://www.udemy.com/course/kafka-streams/
 | 
						|
    magnet:?xt=urn:btih:302b50a873ad4f997158ecad2f79f4f3e92c19f0
 | 
						|
    !!! 895m
 | 
						|
Apache Kafka Series - Learn Apache Kafka for Beginners v3
 | 
						|
    https://www.udemy.com/course/apache-kafka/
 | 
						|
    https://www.oreilly.com/library/view/apache-kafka-series/9781789342604/
 | 
						|
Udemy - Maarek - Apache Kafka Series - Learn Apache Kafka for Beginners v3
 | 
						|
    magnet:?xt=urn:btih:b9bd6bbfb5193f69128c6c34db379821a58d4de2
 | 
						|
    magnet:?xt=urn:btih:1485c91b36e8d099640dc6180dc42fd0e4a8997e
 | 
						|
    magnet:?xt=urn:btih:6f08ccb2d05668fb929dded3c8a5744d5976e7e1
 | 
						|
    https://freecoursesite.com/apache-kafka-series-learn-apache-kafka-for-beginners-v2-37/
 | 
						|
    https://course-downloader.com/apache-kafka-series-learn-apache-kafka-for-beginners-v2-4463
 | 
						|
    ! 3.11g
 | 
						|
????
 | 
						|
Maarek - Apache Kafka Series – Learn Apache Kafka for Beginners v2
 | 
						|
    magnet:?xt=urn:btih:d18a88e411d34c627db87450b5631eeaed6cc193
 | 
						|
    magnet:?xt=urn:btih:b2f4b134238fb62b1cb06182178c348efe550cb5
 | 
						|
    magnet:?xt=urn:btih:ab0be03d946ff2cf02d1f078ec1bcb43f6a884f0
 | 
						|
    magnet:?xt=urn:btih:47dd77a975720aa74d108a7be5ef0ef100a8f55b
 | 
						|
    magnet:?xt=urn:btih:15b04ee6ab7bc26708ff8d59f65526bd13d822e4
 | 
						|
    magnet:?xt=urn:btih:e67fcb091584d050ba794c51e50cf3360682149d
 | 
						|
    magnet:?xt=urn:btih:f33d7bf93e72baf68e50bb5c0cdb055d395729fe
 | 
						|
    magnet:?xt=urn:btih:96e54381ee4af7b2c40e81c50143e94b759a99fa
 | 
						|
    https://tutsnode.net/apache-kafka-series-learn-apache-kafka-for-beginners-v2/
 | 
						|
    https://course-downloader.com/apache-kafka-series-learn-apache-kafka-for-beginners-v2-4425
 | 
						|
    https://github.com/conduktor/kafka-beginners-course
 | 
						|
    https://github.com/conduktor/kafka-stack-docker-compose
 | 
						|
    https://raw.githubusercontent.com/conduktor/kafka-stack-docker-compose/master/zk-single-kafka-single.yml
 | 
						|
    https://github.com/conduktor/conduktor-platform
 | 
						|
    https://github.com/conduktor/conduktor-platform/tree/main/example-local
 | 
						|
    ! 3.16-10g
 | 
						|
Udemy - Apache Kafka Series - Confluent Schema Registry & REST Proxy
 | 
						|
    https://www.udemy.com/course/confluent-schema-registry/
 | 
						|
    magnet:?xt=urn:btih:d67f45d8e3240f47a8efb5650b5b1ad46dde5ede
 | 
						|
    ! 884.6m, 4h30m, interesting docker-compose
 | 
						|
        landoop/fast-data-dev:cp3.3.0
 | 
						|
????
 | 
						|
2019
 | 
						|
Maarek - Learn Apache Kafka for Beginners
 | 
						|
    ! 7h32m
 | 
						|
    https://www.linkedin.com/learning/learn-apache-kafka-for-beginners/intro-to-apache-kafka
 | 
						|
    https://github.com/simplesteph
 | 
						|
    https://medium.com/@stephane.maarek
 | 
						|
TBD:
 | 
						|
    * Kafka Producer Metrics
 | 
						|
    https://www.datadoghq.com/blog/monitoring-kafka-performance-metrics/
 | 
						|
    https://medium.com/@aiiflo/kafka-producer-metrics-monitoring-key-metrics-aa25bafefcea
 | 
						|
    https://docs.confluent.io/platform/current/kafka/monitoring.html#global-request-metrics
 | 
						|
        batch-size-avg
 | 
						|
 | 
						|
theory
 | 
						|
    broker (each broker is also a bootstrap server)
 | 
						|
        has an integer ID
 | 
						|
        contain certain topic partitions, but not all
 | 
						|
        after connecting to any broker (called a bootstrap broker) - connects to the entire cluster
 | 
						|
        knows all the metadata (which topic, partitions are there at the cluster)
 | 
						|
    topics
 | 
						|
        is a particular stream of data
 | 
						|
    partitions
 | 
						|
        is ordered, order guaranteed only within partition
 | 
						|
        data is kept only for some limited amount of time (1 week)
 | 
						|
        data is immutable
 | 
						|
    offsets
 | 
						|
        incremental id (infinite)
 | 
						|
    replication factory
 | 
						|
        3 is usual, 2 - dangerous
 | 
						|
    leader
 | 
						|
        at any time only ONE broker can be a Leader for a given partition
 | 
						|
        only that leader can receive and serve data for a partition
 | 
						|
        the other brokers will synchronize data - ISR (in-sync replica)
 | 
						|
    send-mode, ACK
 | 
						|
        producer can choose to receive ACK for data-writes ()
 | 
						|
        acks=0  , no ack, dangerous
 | 
						|
            =1  , wait only for Leader to ack, limited data loss
 | 
						|
            =all, wait for all replicas, no data lost
 | 
						|
    key of the message
 | 
						|
        key=null -> round-robin
 | 
						|
        all the messages with the same key go to the same partition
 | 
						|
    consumers
 | 
						|
        read data in order "within each partition"
 | 
						|
    consumer-group
 | 
						|
        each consumer within a group reads from exclusive partitions
 | 
						|
        in case of number of consumer is greater than partitions, some consumers will be inactive
 | 
						|
        each group has "generation" and rebalance consumers upon adding/removing consumers in a group
 | 
						|
        !!! by using a new fresh consumer group, it is possible to read all the messages "again"
 | 
						|
    consumer-offsets
 | 
						|
        kafka stores the offsets at which a consumer group hass been reading
 | 
						|
        these offsets are committed live in a Kafka topic "__consumer_offsets"
 | 
						|
        these imply "delivery semantics" notion
 | 
						|
    delivery semantics
 | 
						|
        - at most once (offset is committed as soon as the message is received)
 | 
						|
        - at least once (offset is committed only after message has been processed)
 | 
						|
            processing system need to be idempotent in this case
 | 
						|
        - exactly once, can be achieved for Kafka=>(Kafka Workflows) using Stream API
 | 
						|
    zookeeper
 | 
						|
        in PROD operates with an odd number of zookeeper-servers (3, 5, 7)
 | 
						|
        one is Leader, others - Followers
 | 
						|
        zk does not store consumer offsets anymore starting from Kafka > v0.10
 | 
						|
 | 
						|
 | 
						|
    https://www.linkedin.com/learning/learn-apache-kafka-for-beginners/intro-to-apache-kafka
 | 
						|
    https://www.linkedin.com/learning/learn-apache-kafka-for-beginners/apache-kafka-in-five-minutes
 | 
						|
    https://www.linkedin.com/learning/learn-apache-kafka-for-beginners/kafka-theory-overview
 | 
						|
    https://www.linkedin.com/learning/learn-apache-kafka-for-beginners/topics-partitions-and-offsets
 | 
						|
    https://www.linkedin.com/learning/learn-apache-kafka-for-beginners/brokers-and-topics
 | 
						|
    https://www.linkedin.com/learning/learn-apache-kafka-for-beginners/topic-replication
 | 
						|
    https://www.linkedin.com/learning/learn-apache-kafka-for-beginners/producers-and-message-keys
 | 
						|
    https://www.linkedin.com/learning/learn-apache-kafka-for-beginners/consumer-and-consumer-group
 | 
						|
    https://www.linkedin.com/learning/learn-apache-kafka-for-beginners/consumer-offsets-and-delivery-semantics
 | 
						|
    https://www.linkedin.com/learning/learn-apache-kafka-for-beginners/kafka-broker-discovery
 | 
						|
    https://www.linkedin.com/learning/learn-apache-kafka-for-beginners/zookeeper
 | 
						|
    https://www.linkedin.com/learning/learn-apache-kafka-for-beginners/kafka-guarantees
 | 
						|
    https://www.linkedin.com/learning/learn-apache-kafka-for-beginners/macos-download-and-set-up-kafka-in-path
 | 
						|
    ...
 | 
						|
    https://www.linkedin.com/learning/learn-apache-kafka-for-beginners/kafka-topics-cli
 | 
						|
        kafka-topics
 | 
						|
            common-stuff:
 | 
						|
                --zookeeper <host>:2181
 | 
						|
 | 
						|
            --list
 | 
						|
                list all existing topics
 | 
						|
            --create
 | 
						|
                --topic <topic-name>
 | 
						|
                --partitions 3
 | 
						|
                --replication-factor 2 (need to be more than the number of available brokers)
 | 
						|
            --describe
 | 
						|
                good table of partitions
 | 
						|
            --delete
 | 
						|
                mark a topic for deletion (does not delete it right away)
 | 
						|
                (delete.topic.enable=true)
 | 
						|
    https://www.linkedin.com/learning/learn-apache-kafka-for-beginners/kafka-console-producer-cli
 | 
						|
        kafka-console-producer
 | 
						|
            common-stuff:
 | 
						|
                --broker-list <host>:9192
 | 
						|
            --producer-property
 | 
						|
                acks=all
 | 
						|
            --topic <topic-name>
 | 
						|
                line-by-line till Ctrl-C
 | 
						|
                Note: new topic is created if needed (LEADER_NOT_AVAILABLE WARN)
 | 
						|
                      with default num of partitions and repl-factor
 | 
						|
        config/server.properties
 | 
						|
            num.partitions=1 # default
 | 
						|
    https://www.linkedin.com/learning/learn-apache-kafka-for-beginners/kafka-console-consumer-cli
 | 
						|
        kafka-console-consumer
 | 
						|
            common-stuff:
 | 
						|
                --bootstrap-server <host>:9192
 | 
						|
            --topic <topic-name>
 | 
						|
            --from-beginning 
 | 
						|
                !!! by default, this consumer only reads new messages (from the start moment)
 | 
						|
            ...
 | 
						|
    https://www.linkedin.com/learning/learn-apache-kafka-for-beginners/kafka-consumers-in-groups
 | 
						|
        kafka-console-consumer
 | 
						|
            ...
 | 
						|
            --group <consumer-group-name>
 | 
						|
                to be part of a consumer group
 | 
						|
    https://www.linkedin.com/learning/learn-apache-kafka-for-beginners/kafka-consumer-groups-cli 
 | 
						|
        kafka-consumer-groups
 | 
						|
            common-stuff:
 | 
						|
                --bootstrap-server <host>:9192
 | 
						|
            --list
 | 
						|
                ... console-consumer-<id>
 | 
						|
            --describe <group-name>
 | 
						|
                ... LAG 0 -> meaning console conumer read all the data
 | 
						|
            ...
 | 
						|
    https://www.linkedin.com/learning/learn-apache-kafka-for-beginners/resetting-offsets
 | 
						|
        kafka-consumer-groups
 | 
						|
            ...
 | 
						|
            --reset-offsets
 | 
						|
                --dry-run|--execute
 | 
						|
                --by-period
 | 
						|
                --to-earliest
 | 
						|
                ... scopes...
 | 
						|
                --all-topics
 | 
						|
                --topic <topic-name>
 | 
						|
            --shift-by <num>
 | 
						|
            --describe
 | 
						|
                --group <group-name>
 | 
						|
    https://www.linkedin.com/learning/learn-apache-kafka-for-beginners/kafka-tool-ui
 | 
						|
        www.kafkatool.com
 | 
						|
 | 
						|
    #
 | 
						|
    # Java stuff
 | 
						|
    #
 | 
						|
        https://kafka.apache.org/documentation/
 | 
						|
 | 
						|
    https://www.linkedin.com/learning/learn-apache-kafka-for-beginners/creating-a-kafka-project
 | 
						|
        mvn deps: org.apache.kafka:kafka-clients
 | 
						|
    https://www.linkedin.com/learning/learn-apache-kafka-for-beginners/java-producer
 | 
						|
        https://kafka.apache.org/documentation/#producerapi
 | 
						|
 | 
						|
        import org.apache.kafka.clients.producer.KafkaProducer;
 | 
						|
        import org.apache.kafka.clients.producer.ProducerConfig;
 | 
						|
        import org.apache.kafka.common.serialization.StringSerializer;
 | 
						|
 | 
						|
        Properties properties = new Properties();
 | 
						|
        // bootstrap.servers
 | 
						|
        properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "...");
 | 
						|
        // key.serializer
 | 
						|
        properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
 | 
						|
        // value.serializer
 | 
						|
        properties.setProperty(ProducerConfig.VALUEKEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
 | 
						|
 | 
						|
        // <K, V>
 | 
						|
        KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties);
 | 
						|
 | 
						|
        // key and other stuff can be here as well
 | 
						|
        ProducerRecord<String, String> record = new ProducerRecord<String, String>("first-topic", "some-value"); 
 | 
						|
 | 
						|
        // this is async !!!
 | 
						|
        producer.send(record);
 | 
						|
 | 
						|
        // to actually send
 | 
						|
        producer.flush();
 | 
						|
 | 
						|
        // flush and close
 | 
						|
        producer.close();
 | 
						|
 | 
						|
    https://www.linkedin.com/learning/learn-apache-kafka-for-beginners/java-producer-callbacks
 | 
						|
        ...
 | 
						|
        import org.apache.kafka.clients.producer.Callback;
 | 
						|
        ...
 | 
						|
        // set data asynchronously
 | 
						|
        producer.send(record, new Callback() {
 | 
						|
            public void onCompletion(RecordMetadata recordMetadata, Exception e) {
 | 
						|
                // executes every time record is successfuly sent or an exception is thrown
 | 
						|
                if (e == null) {
 | 
						|
                    // the record was successfully sent
 | 
						|
                    // log recordMetadata.topic(), .partition(), .offset(), .timestamp()
 | 
						|
                } else {
 | 
						|
                }
 | 
						|
            }
 | 
						|
        });
 | 
						|
 | 
						|
    https://www.linkedin.com/learning/learn-apache-kafka-for-beginners/java-producer-with-keys
 | 
						|
        ...
 | 
						|
        ProducerRecord<String, String> record = new ProducerRecord<String, String>("first-topic", "some-key", some-value"); 
 | 
						|
        ...
 | 
						|
        // set data asynchronously
 | 
						|
        producer.send(record, new Callback() {
 | 
						|
            public void onCompletion(RecordMetadata recordMetadata, Exception e) {
 | 
						|
                // executes every time record is successfuly sent or an exception is thrown
 | 
						|
                if (e == null) {
 | 
						|
                    // the record was successfully sent
 | 
						|
                    // log recordMetadata.topic(), .partition(), .offset(), .timestamp()
 | 
						|
                } else {
 | 
						|
                }
 | 
						|
            }
 | 
						|
        }).get(); // block the .send() to make it synchronous. Don't do it in PROD...
 | 
						|
          // throws ExecutionException, InterruptedException
 | 
						|
 | 
						|
    https://www.linkedin.com/learning/learn-apache-kafka-for-beginners/java-consumer
 | 
						|
        import org.apache.kafka.clients.producer.ConsumerConfig;
 | 
						|
        import org.apache.kafka.common.serialization.StringDeserializer;
 | 
						|
 | 
						|
        Properties properties = new Properties();
 | 
						|
        // bootstrap.servers
 | 
						|
        properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "...");
 | 
						|
        // key.deserializer
 | 
						|
        properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
 | 
						|
        // value.deserializer
 | 
						|
        properties.setProperty(ConsumerConfig.VALUEKEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
 | 
						|
        // groupId
 | 
						|
        properties.setProperty(ConsumerConfig.GROUP_ID, "my-fourth-app");
 | 
						|
        // auto.offset.reset
 | 
						|
        // "earliest" is equivalent to "--from-beginning" CLI option
 | 
						|
        properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest|latest|none");
 | 
						|
 | 
						|
        KafkaConsumer<String, Strring> consumer = new KafkaConsumer<String, String>(properties);
 | 
						|
 | 
						|
        consumer.subscribe(Collection.singleton("first-topic")); // Arrays.asList("first-topic", "second-topic")
 | 
						|
 | 
						|
        while(true) {
 | 
						|
            // java.time.Duration
 | 
						|
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); // deprecated
 | 
						|
            for (ConsumerRecord<String, String> record : records) {
 | 
						|
                // record .key(), .value(), .partition(), .offset()
 | 
						|
            }
 | 
						|
        }
 | 
						|
 | 
						|
    https://www.linkedin.com/learning/learn-apache-kafka-for-beginners/java-consumer-inside-consumer-group
 | 
						|
        ...
 | 
						|
        groupId = "my-fifth-application"
 | 
						|
        ...
 | 
						|
        // !!! because we started 2-nd consumer, the group started rebalancing
 | 
						|
        log:
 | 
						|
        INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-1, group-id=my-fifth-application]
 | 
						|
            Attempt to heartbeat failed since group is rebalancing
 | 
						|
        INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-1, group-id=my-fifth-application]
 | 
						|
            Revoking previously assigned partitions [first_topic-0, first_topic-1, first_topic-2]
 | 
						|
        INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-1, group-id=my-fifth-application]
 | 
						|
            (Re-)joining group
 | 
						|
        INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-1, group-id=my-fifth-application]
 | 
						|
            Successfully joined group with generation 2
 | 
						|
        INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-1, group-id=my-fifth-application]
 | 
						|
            Setting newly assigned partitions [first_topic-0, first_topic-1]
 | 
						|
 | 
						|
    https://www.linkedin.com/learning/learn-apache-kafka-for-beginners/java-consumer-with-threads
 | 
						|
        // let's get rid of the ugly while(true) loop
 | 
						|
        import java.util.concurrent.CountDownLatch;
 | 
						|
        ...
 | 
						|
        public class ConsumerDemoWithThread {
 | 
						|
 | 
						|
            public static void main(String[] args) {
 | 
						|
                new ConsumerDemoWithThread().run();
 | 
						|
            }
 | 
						|
 | 
						|
            private ConsumerDemoWithThread() {
 | 
						|
            }
 | 
						|
 | 
						|
            private void run() {
 | 
						|
                Logger logger = LoggerFactory.getLogger(ConsumerDemoWithThread.class);
 | 
						|
                ...
 | 
						|
                groupId = "my-sixth-application"
 | 
						|
 | 
						|
                CountDownLatch latch = new CountDownLatch(1);
 | 
						|
 | 
						|
                Runnable myConsumerRunnable = new ConsumerRunnable(..., latch);
 | 
						|
 | 
						|
                Thread myThread = new Thread(myConsumerRunnable);
 | 
						|
                myThread.start();
 | 
						|
 | 
						|
                Runtime.getRuntime().addShutdownHook(new Thread( () -> {
 | 
						|
                    logger.info("Caught shutdown hook");
 | 
						|
                    ((ConsumerRunnable) myConsumerRunnable).shutdown();
 | 
						|
                    try {
 | 
						|
                        latch.await();
 | 
						|
                    } catch (InterruptedException e) {
 | 
						|
                        e.printStackTrace();
 | 
						|
                    }
 | 
						|
                    logger.info("Application has exited");
 | 
						|
                }))
 | 
						|
 | 
						|
                try {
 | 
						|
                    latch.await();
 | 
						|
                } catch() {
 | 
						|
                    logger.error("Application is interrupting", e);
 | 
						|
                } finally {
 | 
						|
                    logger.info("Application is closing");
 | 
						|
                }
 | 
						|
            }
 | 
						|
 | 
						|
            public class ConsumerThread extends Runnable {
 | 
						|
                private static Logger logger = LoggerFactory.getLogger(ConsumerThread.class);
 | 
						|
 | 
						|
                private CountDownLatch latch;
 | 
						|
                private KafkaConsumer<String, String> consumer;
 | 
						|
 | 
						|
                public ConsumerThread(String bootstrapServers, String groupId, String topic, CountDownLatch latch) {
 | 
						|
                    this.latch = latch;
 | 
						|
 | 
						|
                    Properties properties = new Properties();
 | 
						|
                    // bootstrap.servers
 | 
						|
                    properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
 | 
						|
                    // key.deserializer
 | 
						|
                    properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
 | 
						|
                    // value.deserializer
 | 
						|
                    properties.setProperty(ConsumerConfig.VALUEKEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
 | 
						|
                    // groupId
 | 
						|
                    properties.setProperty(ConsumerConfig.GROUP_ID, groupId);
 | 
						|
                    // auto.offset.reset
 | 
						|
                    // "earliest" is equivalent to "--from-beginning" CLI option
 | 
						|
                    properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest|latest|none");
 | 
						|
 | 
						|
                    consumer = new KafkaConsumer<String, String>(properties);
 | 
						|
                }
 | 
						|
 | 
						|
                @Override
 | 
						|
                public void run() {
 | 
						|
                    try {
 | 
						|
                        while(true) {
 | 
						|
                            // java.time.Duration
 | 
						|
                            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); // deprecated
 | 
						|
                            for (ConsumerRecord<String, String> record : records) {
 | 
						|
                                // record .key(), .value(), .partition(), .offset()
 | 
						|
                            }
 | 
						|
                        }
 | 
						|
                    } catch (WakeupException e) {
 | 
						|
                        logger.info("received shutdown exception");
 | 
						|
                    } finally {
 | 
						|
                        // super-important!
 | 
						|
                        consumer.close();
 | 
						|
                        // tell our main thread that we're done with the consumer
 | 
						|
                        latch.countDown();
 | 
						|
                    }
 | 
						|
                }
 | 
						|
 | 
						|
                public void shutdown() {
 | 
						|
                    // a special method to interrupt consumer.poll()
 | 
						|
                    // it will throw WakeupException
 | 
						|
                    consumer.wakeup();
 | 
						|
                }
 | 
						|
            }
 | 
						|
        
 | 
						|
    https://www.linkedin.com/learning/learn-apache-kafka-for-beginners/java-consumer-seek-and-assign
 | 
						|
        // assign and seek
 | 
						|
        import org.apache.kafka.common.TopicPartition;
 | 
						|
        ...
 | 
						|
        remove the groupId property:
 | 
						|
        // properties.setProperty(ConsumerConfig.GROUP_ID, groupId);
 | 
						|
        and don't subscribe to topic
 | 
						|
        // consumer.subscribe(Collection.singleton("first-topic")); // Arrays.asList("first-topic", "second-topic")
 | 
						|
 | 
						|
        // assign and seek are mostly used to replay data or fetch a specific message
 | 
						|
 | 
						|
        // assign
 | 
						|
        TopicPartition partition = new TopicPartition(topic, 0); // partition 0
 | 
						|
        consumer.assign(Arrays.asList(partitionsToReadFrom));
 | 
						|
 | 
						|
        // seek
 | 
						|
        long offsetToReadFrom = 15L;
 | 
						|
        consumer.seek(partitionsToReadFrom, offsetToReadFrom); // this consumer should read from these partitions and offsets
 | 
						|
 | 
						|
        int numberOfMessagesToRead = 5;
 | 
						|
        boolean keepOnReading = true;
 | 
						|
        int numberOfMessagesReadSoFar = 0;
 | 
						|
 | 
						|
        while(keepOnReading) {
 | 
						|
            // java.time.Duration
 | 
						|
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); // new in Kafka 2.0
 | 
						|
            for (ConsumerRecord<String, String> record : records) {
 | 
						|
                numberOfMessagesToRead += 1;
 | 
						|
                // record .key(), .value(), .partition(), .offset()
 | 
						|
                if (numberOfMessagesReadSoFar >= numberOfMessagesToRead) {
 | 
						|
                    keepOnReading = false; // to exit while-loop
 | 
						|
                    break to exit for-loop
 | 
						|
                }
 | 
						|
            }
 | 
						|
        }
 | 
						|
        logger.info("Exitin the application");
 | 
						|
 | 
						|
    https://www.linkedin.com/learning/learn-apache-kafka-for-beginners/client-bidirectional-compatibility
 | 
						|
        // As of kafka 0.10.2 (July 2017) it has bi-directional compatibility
 | 
						|
        - an OLDER client (ex 1.1) can talk to a newer broker (2.0)
 | 
						|
        - a NEWER client (ex 2.0) can talk to an older broker (1.1)
 | 
						|
 | 
						|
    https://www.linkedin.com/learning/learn-apache-kafka-for-beginners/twitter-setup
 | 
						|
        https://apps.twitter.com
 | 
						|
        https://developer.twitter.com
 | 
						|
            for personal use
 | 
						|
        ...
 | 
						|
            keys&tokens -> Consumer API keys (API key, API secret key)
 | 
						|
            access token & access token secret
 | 
						|
        https://github.com/twitter/hbc
 | 
						|
            com.twitter:hbc-core
 | 
						|
 | 
						|
    https://www.linkedin.com/learning/learn-apache-kafka-for-beginners/producer-part-1-writing-a-twitter-client
 | 
						|
        ...
 | 
						|
        import java.util.concurrent.BlockingQueue;
 | 
						|
        import java.util.concurrent.LinkedBlockingQueue;
 | 
						|
 | 
						|
        public class TwitterProducer {
 | 
						|
            private static Logger logger = LoggerFactory.getLogger(TwitterProducer.class);
 | 
						|
 | 
						|
            public TwitterProducer() {
 | 
						|
            }
 | 
						|
 | 
						|
            public static void main(String[] args) {
 | 
						|
                new TwitterProducer().run();
 | 
						|
            }
 | 
						|
 | 
						|
            public void run() {
 | 
						|
                // create a twitter client
 | 
						|
                BlockingQueue<String> msgQueue = new LinkedBlockingQueue<String>(100000);
 | 
						|
                Client client = createTwitterClient();
 | 
						|
 | 
						|
                // attempt to establish connection
 | 
						|
                client.connect();
 | 
						|
                
 | 
						|
                // create a kafka producer
 | 
						|
 | 
						|
                // loop to send tweets to kafka
 | 
						|
                while (!client.isDone()) {
 | 
						|
                    try {
 | 
						|
                        String msg = msgQueue.poll(5, TimeUnit.SECONDS);
 | 
						|
                    } catch (InterruptedException e) {
 | 
						|
                        e.printStackTrace();
 | 
						|
                        client.stop();
 | 
						|
                    }
 | 
						|
                    if (msg != null) {
 | 
						|
                        logger.info(msg);
 | 
						|
                    }
 | 
						|
                }
 | 
						|
                logger.info("End of application");
 | 
						|
            }
 | 
						|
 | 
						|
            String concumerKey = "";
 | 
						|
            // ...
 | 
						|
 | 
						|
            public Client createTwitterClient(BlockingQueue<String> msgQueue) {
 | 
						|
                // ...
 | 
						|
                Hosts hosebirdHosts = new HttpHosts(Constants.STREAM_HOST);
 | 
						|
                StatusesFilterEndpoint hosebirdEndpoint = new StatusesFilterEndpoint();
 | 
						|
 | 
						|
                List<String> terms = Lists.newArrayList("kafka");
 | 
						|
                hosebirdEndpoint.trackTerms(terms);
 | 
						|
 | 
						|
                Authentication hosebirdAuth = new OAuth1(consumerKey, consumerSecret, token, tokenSecret);
 | 
						|
 | 
						|
                ClientBuilder builder = new ClientBuilder()
 | 
						|
                    .name("...")
 | 
						|
                    .hosts(hosebirdHosts)
 | 
						|
                    .authentication(hosebirdAuth)
 | 
						|
                    .endpoint(hosebirdEndpoint)
 | 
						|
                    .processor(new StringDelimitedProcessor(msgQueue))
 | 
						|
 | 
						|
                Client hosebirdClient = builder.build();
 | 
						|
                return hosebirdClient
 | 
						|
            }
 | 
						|
        }
 | 
						|
 | 
						|
    https://www.linkedin.com/learning/learn-apache-kafka-for-beginners/producer-part-2-writing-the-kafka-producer
 | 
						|
        ...
 | 
						|
        public KafkaProducer<String, String> createKafkaProducer() {
 | 
						|
            Properties properties = new Properties();
 | 
						|
            properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "...");
 | 
						|
            properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
 | 
						|
            properties.setProperty(ProducerConfig.VALUEKEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
 | 
						|
 | 
						|
            // <K, V>
 | 
						|
            KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties);
 | 
						|
            return producer;
 | 
						|
        }
 | 
						|
        ...
 | 
						|
        // create a kafka producer
 | 
						|
        KafkaProducer<String, String> producer = createKafkaProducer();
 | 
						|
 | 
						|
        // add a shutdown hook
 | 
						|
        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
 | 
						|
            logger.info("stopping twitter client");
 | 
						|
            client.stop();
 | 
						|
            logger.info("closing producer");
 | 
						|
            producer.stop();
 | 
						|
            logger.info("done");
 | 
						|
        }));
 | 
						|
 | 
						|
        // loop to send tweets to kafka
 | 
						|
        // on a different thread, or multiple different threads...
 | 
						|
        while (!client.isDone()) {
 | 
						|
            String msg = null;
 | 
						|
            try {
 | 
						|
                msg = msgQueue.poll(5, TimeUnit.SECONDS);
 | 
						|
            } catch (InterruptedException e) {
 | 
						|
                e.printStackTrace();
 | 
						|
                client.stop();
 | 
						|
            }
 | 
						|
            if (msg != null) {
 | 
						|
                logger.info(msg);
 | 
						|
                producer.send(new ProducerRecord<>("twitter_tweets", null, msg), new Callback() { // topic, key, value
 | 
						|
                    @Override
 | 
						|
                    public void onCompletion(RecordMetadata recordMetadata, Exception e) {
 | 
						|
                        if (e != null) {
 | 
						|
                            logger.error(e.getMessage(), e);
 | 
						|
                        } 
 | 
						|
                    }
 | 
						|
                }); 
 | 
						|
            }
 | 
						|
        }
 | 
						|
 | 
						|
        $ kafka-topics --zookeper 127.0.0.1:2181 --create --topic tweeter_tweets --partitions 6 --replication-factor 1
 | 
						|
          ... created topic tweeter_tweets
 | 
						|
 | 
						|
    https://www.linkedin.com/learning/learn-apache-kafka-for-beginners/producer-configurations-introduction
 | 
						|
        note: in the logs
 | 
						|
        INFO org.apache.kafka.clients.producer.ProducerConfig - ProducerConfig values:
 | 
						|
            acks = 1
 | 
						|
            batch.size = 16384
 | 
						|
            bootstrap.servers = [127.0.0.1:2181]
 | 
						|
            buffer.memory = 33554432
 | 
						|
            client.id =
 | 
						|
            compression.type = none
 | 
						|
            enable.idempotence = false
 | 
						|
            interceptor.classes = []
 | 
						|
            key.serializer = org.apacke.kafka.common.serialization.StringSerializer
 | 
						|
            linger.ms = 0
 | 
						|
            max.request.size = 
 | 
						|
            value.seializer = 
 | 
						|
 | 
						|
    https://www.linkedin.com/learning/learn-apache-kafka-for-beginners/acks-and-min-insync-replicas
 | 
						|
        Producer Acks Deep Dive
 | 
						|
            acks = 0 (no acks), useful if it's ok to potentially loose messages, nice for performance
 | 
						|
            acks = 1 (leader acknoledgements)
 | 
						|
            acks = all (leader + replicas), most guarantee, adds safety and latency
 | 
						|
                must be used in conjunction with
 | 
						|
                    min.insync.replicas - can be set at the broker or topic level (override)
 | 
						|
                    min.insync.replicas = 2 (at least 2 brokers, incl the leader, must respond)
 | 
						|
                        Exception: NOT_ENOUGH_REPLICAS
 | 
						|
 | 
						|
    https://www.linkedin.com/learning/learn-apache-kafka-for-beginners/retries-and-max-in-flight-requests-per-connection
 | 
						|
        Producer Retries
 | 
						|
            in case of transient failures, developers are expected to handle exceptions,
 | 
						|
            otherwise the data will be lost.
 | 
						|
            Example of transient failure:
 | 
						|
                * NotEnoughReplicasException
 | 
						|
            Thre is a "retries" setting
 | 
						|
                * defaults to 0
 | 
						|
                * You can increase even to an Integer.MAX_VALUE
 | 
						|
            In case of retries, by default there is a chance that messages will be sent out of order
 | 
						|
            (if a batch has failed to be sent)
 | 
						|
            If you rely on key-based ordering, that can be an issue
 | 
						|
            For this, you can set the setting while controls how many produce
 | 
						|
            requests can be made in parallell (to any partition):
 | 
						|
                max.in.flight.requests.per.connection
 | 
						|
                    * Defaults: 5
 | 
						|
                    * set to 1 if you need to ensure ordering (may impact throughput)
 | 
						|
            In Kafka >= 1.0.0 there is a better (and simpler one) solution
 | 
						|
 | 
						|
    https://www.linkedin.com/learning/learn-apache-kafka-for-beginners/idempotent-producer
 | 
						|
        The Producer can introduce duplicate messages in Kafka due to the network errors
 | 
						|
        If "ack" from kafka does not go back to producer due to the network errors,
 | 
						|
        producer produce "retry" request and kafka commits duplicate
 | 
						|
 | 
						|
        In Kafka >= 0.11, you can define a "idempotent producer",
 | 
						|
        which introduces "commit id"
 | 
						|
           
 | 
						|
        They (idempotent producers) come with
 | 
						|
            * retries                = Integer.MAX_VALUE (2^31 - 1)
 | 
						|
            * max.in.flight.requests = 1 (kafka >= 0.11 || kafka < 1.1)
 | 
						|
            * max.in.flight.requests = 5 (kafka >= 1.1 - higher performance)
 | 
						|
            * acks = all
 | 
						|
 | 
						|
        just set:
 | 
						|
            producerProps.put("enable.idempotence", "true")
 | 
						|
 | 
						|
    https://www.linkedin.com/learning/learn-apache-kafka-for-beginners/producer-part-3-safe-producer
 | 
						|
        kafka < 0.11
 | 
						|
            * acks = all
 | 
						|
            * min.insync.replicas = 2 (broker/topic level)
 | 
						|
                ensures two brokers in ISR at least have the data after an ack
 | 
						|
            * retry = MAX_INT (producer level)
 | 
						|
                ensures transient errors are retried indefinitely
 | 
						|
            * max.in.flight.requests.per.connection = 1 (producer level)
 | 
						|
 | 
						|
        kafka >= 0.11
 | 
						|
            * enable.idempotence = true (producer level) + min.insync.replicas = 2 (broker/topic level)
 | 
						|
                implies acks = all, retry = MAX_INT, max.in.flight.requests = 5 (default)
 | 
						|
                while keeping ordering guarantees and improving performance!
 | 
						|
                and also don't get duplicates
 | 
						|
 | 
						|
        !!! running idempotence producer impact througput and latency, always test
 | 
						|
        ...
 | 
						|
            properties.setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
 | 
						|
            properties.setProperty(ProducerConfig.ACKS_CONFIG, "all");
 | 
						|
            properties.setProperty(ProducerConfig.RETRIES_CONFIG, Integer.toString(Integer.MAX_VALUE);
 | 
						|
            properties.setProperty(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "5"); // for kafka >= 1.1
 | 
						|
 | 
						|
    https://www.linkedin.com/learning/learn-apache-kafka-for-beginners/producer-compression
 | 
						|
        compression.type
 | 
						|
            * note (default)
 | 
						|
            * gzip
 | 
						|
            * lz4
 | 
						|
            * snappy
 | 
						|
 | 
						|
        it is more effective the bigger the batch of message being sent to Kafka
 | 
						|
            * benchmark here (https://blog.coudflare.com/squeezing-the-firehose)
 | 
						|
 | 
						|
    https://www.linkedin.com/learning/learn-apache-kafka-for-beginners/producer-batching
 | 
						|
        ...
 | 
						|
        by default, kafka tries to send data as soon as possible
 | 
						|
            * It will have up to 5 requests in flight,
 | 
						|
              meaning up to 5 messages individually sent at the same time
 | 
						|
            * After this, if more messages have to be sent while others are in flight,
 | 
						|
              Kafka is smart and will start batching them while they wait
 | 
						|
              to send them all at once
 | 
						|
        This smart batching allows Kafka to increase throughput
 | 
						|
            while maintaining very low latency
 | 
						|
        Batches have higher compression ratio so better efficiency
 | 
						|
            all messages in a batch are sent in one request
 | 
						|
 | 
						|
        How to control batching?
 | 
						|
            * linger.ms - number of ms, a producer is willing to wait
 | 
						|
              before sending a batch out (default 0)
 | 
						|
              by introducing some lag (for example linger.ms=5) we increase
 | 
						|
              chances of messages being sent togeather in a batch
 | 
						|
            * if batch is full (see batch.size) before the end of the linger.ms period,
 | 
						|
              it will be sent to Kafka right away
 | 
						|
                  the default is 16Kb (can increase to 32/64)
 | 
						|
              any message bigger will not be batched
 | 
						|
              A batch is allocated per partition
 | 
						|
        Note: you can monitor the average batch size metric using
 | 
						|
            * Kafka Producer Metrics
 | 
						|
 | 
						|
    https://www.linkedin.com/learning/learn-apache-kafka-for-beginners/producer-part-4-high-throughput-producer
 | 
						|
        * add "snappy" for mainly text-data
 | 
						|
        * batch size to 32Kb
 | 
						|
        * linger.ms to 20ms
 | 
						|
 | 
						|
        properties.setProperty(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy");
 | 
						|
        properties.setProperty(ProducerConfig.LINGER_MS_CONFIG, "20");
 | 
						|
        properties.setProperty(ProducerConfig.BATCH_SIZE_CONFIG, Integer.toString(32*1024));
 | 
						|
 | 
						|
    https://www.linkedin.com/learning/learn-apache-kafka-for-beginners/producer-default-partitions-and-key-hashing
 | 
						|
        by default, keys are hashed using the
 | 
						|
            * murmur2
 | 
						|
 | 
						|
        It is possible to override the behavior of the partitioner
 | 
						|
            * partitioner.class
 | 
						|
        The formula is:
 | 
						|
            * targetPartition = Utils.abs(Utils.murmur2(record.key())) % numPartitions;
 | 
						|
 | 
						|
        If the producer produces faster than the broker can take,
 | 
						|
        the recors will be buffered in memory
 | 
						|
            * buffer.memory = 33554432 (32Mb): the size of the send buffer
 | 
						|
 | 
						|
        If the buffer is full (32Mb), then the .send() method will start to block
 | 
						|
        (won't return right away)
 | 
						|
            * Max.block.ms = 60000 (1 min) - the time .send() will block
 | 
						|
              untill throwing an exception for couple of reasons
 | 
						|
                  * the producer has filled up its buffer
 | 
						|
                  * the broker is not accepting any new data
 | 
						|
                  * 60 seconds has elapsed
 | 
						|
 | 
						|
    https://www.linkedin.com/learning/learn-apache-kafka-for-beginners/refactoring-the-project
 | 
						|
        create maven modules:
 | 
						|
            * kafka-basics
 | 
						|
            * kafka-producer-twitter
 | 
						|
 | 
						|
    https://www.linkedin.com/learning/learn-apache-kafka-for-beginners/setting-up-elasticsearch-in-the-cloud
 | 
						|
        https://bonsai.io/
 | 
						|
        creds, interactiv console
 | 
						|
 | 
						|
    https://www.linkedin.com/learning/learn-apache-kafka-for-beginners/elasticsearch-101
 | 
						|
        https://www.elastic.co/guide/en/elasticsearch/reference/current/_cluster_health.html
 | 
						|
            /_cat/health?v
 | 
						|
            /_cat/nodes?v
 | 
						|
            /_cat/indices?v
 | 
						|
        PUT
 | 
						|
            /twitter
 | 
						|
                {}
 | 
						|
            /twitter/tweets/1
 | 
						|
                {
 | 
						|
                  "course": "Kafka for beg...",
 | 
						|
                  "instructor": "Stephane Maarek",
 | 
						|
                  "module": "ElasticSearch"
 | 
						|
                }
 | 
						|
        GET
 | 
						|
            /twitter/tweets/1
 | 
						|
            {
 | 
						|
                "_index": "twitter",
 | 
						|
                "_type": "tweets",
 | 
						|
                "_id": 1,
 | 
						|
                "_version": 2,
 | 
						|
                "found": true,
 | 
						|
                "_source": {
 | 
						|
                  "course": "Kafka for beg...",
 | 
						|
                  "instructor": "Stephane Maarek",
 | 
						|
                  "module": "ElasticSearch"
 | 
						|
                }
 | 
						|
            }
 | 
						|
        DELETE
 | 
						|
            /twitter
 | 
						|
 | 
						|
    https://www.linkedin.com/learning/learn-apache-kafka-for-beginners/consumer-part-1-set-up-project
 | 
						|
        https://www.elastic.co/guide/en/elasticsearch/client/java-rest/current/java-rest-high-getting-started-maven.html
 | 
						|
            dep: org.elasticsearch.client:elasticsearch-rest-high-level-client:6.4.0
 | 
						|
 | 
						|
        public class ElasticSearchConsumer {
 | 
						|
            public static RestHighLevelClient createClient() {
 | 
						|
                // https://<username>:<password>@<hostname>
 | 
						|
                String hostname = "";
 | 
						|
                String username = "";
 | 
						|
                String password = "";
 | 
						|
                // !!! not for the case of local ES
 | 
						|
                final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
 | 
						|
                credentialsProvider.setCredentials(AuthScope.ANY,
 | 
						|
                      new UsernamePasswordCredentials(username, password));
 | 
						|
                RestClientBuilder builder = RestClient.builder(
 | 
						|
                      new HttpHost(hostname, 443, "https"))
 | 
						|
                      .setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
 | 
						|
                          @Override
 | 
						|
                          public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) {
 | 
						|
                              return httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
 | 
						|
                          }
 | 
						|
                      });
 | 
						|
 | 
						|
                RestHighLevelClient client = new RestHighLevelClient(builder);
 | 
						|
                return client;
 | 
						|
            }
 | 
						|
 | 
						|
            public static void main(String[] args) throws IOException {
 | 
						|
                RestHighLevelClient client = createClient();
 | 
						|
                String jsonString = "{\"foo\": \"bar\"}"
 | 
						|
                IndexRequest indexRequest = new IndexRequest("twitter", "tweets")
 | 
						|
                    .source(jsonString, XContentType.JSON);
 | 
						|
                // IOException
 | 
						|
                IndexResponse indexResponse = client.index(indexRequest, RequestOptions.DEFAULT);
 | 
						|
                String id = indexResponse.getId();
 | 
						|
                logger.info(id);
 | 
						|
                client.close();
 | 
						|
            }
 | 
						|
        }
 | 
						|
 | 
						|
    https://www.linkedin.com/learning/learn-apache-kafka-for-beginners/consumer-part-2-write-the-consumer-and-send-to-elasticsearch
 | 
						|
        ...
 | 
						|
        // String topic   = "twitter_tweets";
 | 
						|
        public static KafkaConsumer<String, String> createConsumer(String topic) {
 | 
						|
            ...
 | 
						|
            String groupId = "kafka-demo-elasticsearch";
 | 
						|
            ...
 | 
						|
            properties.setProperty(ConsumerConfig.GROUP_ID, groupId);
 | 
						|
            ...
 | 
						|
            // "earliest" is equivalent to "--from-beginning" CLI option
 | 
						|
            properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest|latest|none");
 | 
						|
            ...
 | 
						|
            KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
 | 
						|
            consumer.subscribe(Arrays.asList(topic));
 | 
						|
            return consumer;
 | 
						|
        }
 | 
						|
        ...
 | 
						|
        public static void main(String[] args) {
 | 
						|
            ...
 | 
						|
            while(true) {
 | 
						|
                // java.time.Duration
 | 
						|
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
 | 
						|
                for (ConsumerRecord<String, String> record : records) {
 | 
						|
                    // record .key(), .value(), .partition(), .offset()
 | 
						|
 | 
						|
                    String jsonString = record.value();
 | 
						|
 | 
						|
                    IndexRequest indexRequest = new IndexRequest(
 | 
						|
                        "twitter", // indexer
 | 
						|
                        "tweets"   // type
 | 
						|
                    ).source(jsonString, XContentType.JSON);
 | 
						|
 | 
						|
                    IndexResponse indexResponse = client.index(indexRequest, RequestOptions.DEFAULT);
 | 
						|
                    String id = indexResponse.getId();
 | 
						|
                    logger.info(id);
 | 
						|
                    
 | 
						|
                    Thread.sleep(1000); // just for demo
 | 
						|
                }
 | 
						|
            }
 | 
						|
            // client.close();
 | 
						|
        }
 | 
						|
 | 
						|
    https://www.linkedin.com/learning/learn-apache-kafka-for-beginners/consumer-part-3-delivery-semantics
 | 
						|
        // making it idempotent by tracking id
 | 
						|
        ...
 | 
						|
                    IndexRequest indexRequest = new IndexRequest(
 | 
						|
                        "twitter", // indexer
 | 
						|
                        "tweets",  // type
 | 
						|
                        id         // !!!
 | 
						|
                    )...
 | 
						|
        ...
 | 
						|
                    // 2 strategies
 | 
						|
                    //
 | 
						|
                    // 1. kafka generic id
 | 
						|
                    // String id = record.topic() + "_" + record.partition() + "_" + record.offset();
 | 
						|
                    //
 | 
						|
                    // 2. twitter specific id
 | 
						|
                    String id = extractIdFromTweet(record.value());
 | 
						|
                
 | 
						|
                    IndexRequest indexRequest = new IndexRequest(
 | 
						|
                        "twitter", // indexer
 | 
						|
                        "tweets",  // type
 | 
						|
                        id         // this is to make consumer idempotent
 | 
						|
                    ).source(jsonString, XContentType.JSON);
 | 
						|
        ...
 | 
						|
            // google gson
 | 
						|
            private static JsonParser jsonParser = new JsonParser();
 | 
						|
 | 
						|
            private static String extractIdFromTweet(String tweetJson) {
 | 
						|
                return jsonParser.parse(tweetJson)
 | 
						|
                    .getAsJsonObject()
 | 
						|
                    .get("id_str")
 | 
						|
                    .getAsString();
 | 
						|
            }
 | 
						|
 | 
						|
    https://www.linkedin.com/learning/learn-apache-kafka-for-beginners/delivery-semantics-for-consumers
 | 
						|
        At most once
 | 
						|
            offsets are committed as soon as the message batch is received.
 | 
						|
            If the process goes wrong, the message will be lost
 | 
						|
            (it won't be read again)
 | 
						|
 | 
						|
        At least once
 | 
						|
            offsets are committed after the message is processed
 | 
						|
            if the processing goes wrong, the message will be read again
 | 
						|
            This can result in duplicate processing of messages.
 | 
						|
            Make sure your processing is idempotent
 | 
						|
           
 | 
						|
        Exactly once
 | 
						|
            only for Kafka => Kafka workflows
 | 
						|
            using Kafka Streams API
 | 
						|
            for Kafka => Sink workflows, use an idempotent consumer
 | 
						|
 | 
						|
    https://www.linkedin.com/learning/learn-apache-kafka-for-beginners/delivery-semantics-for-consumers
 | 
						|
        1:00
 | 
						|
        ...
 | 
						|
        properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest|latest|none");
 |