From 9535bd43038d56c3f7787f0ce3c4b35bf34a7909 Mon Sep 17 00:00:00 2001 From: Ihar Hancharenka Date: Fri, 18 Apr 2025 13:36:04 +0300 Subject: [PATCH] m --- net/proto/email/tuta.txt | 1 + .../kafka/features/internals/poll-listen.txt | 221 ++++++++++++++++++ 2 files changed, 222 insertions(+) create mode 100644 net/proto/email/tuta.txt create mode 100644 pl/cross/messaging/kafka/features/internals/poll-listen.txt diff --git a/net/proto/email/tuta.txt b/net/proto/email/tuta.txt new file mode 100644 index 000000000..fa1d0c110 --- /dev/null +++ b/net/proto/email/tuta.txt @@ -0,0 +1 @@ +https://tuta.com/ diff --git a/pl/cross/messaging/kafka/features/internals/poll-listen.txt b/pl/cross/messaging/kafka/features/internals/poll-listen.txt new file mode 100644 index 000000000..a4844dd06 --- /dev/null +++ b/pl/cross/messaging/kafka/features/internals/poll-listen.txt @@ -0,0 +1,221 @@ +org.springframework.kafka.listener.KafkaMessageListenerContainer + + private final class ListenerConsumer implements SchedulingAwareRunnable, ConsumerSeekCallback { + ... + private final Consumer consumer; + ... + ListenerConsumer(GenericMessageListener listener, ListenerType listenerType, + ObservationRegistry observationRegistry) { + ... + this.consumer = + ... + // + // this.containerProperties.getKafkaConsumerProperties(), this.propertyOverrides + // + Properties consumerProperties = propertiesFromConsumerPropertyOverrides(); + ... + KafkaMessageListenerContainer.this.consumerFactory.createConsumer( + this.consumerGroupId, + this.containerProperties.getClientId(), + KafkaMessageListenerContainer.this.clientIdSuffix, + consumerProperties); + ... + } + ... + protected void pollAndInvoke() { // !!! no interesting stuff before calling this + doProcessCommits(); + fixTxOffsetsIfNeeded(); + idleBetweenPollIfNecessary(); + if (!this.seeks.isEmpty()) { + processSeeks(); + } + enforceRebalanceIfNecessary(); + pauseConsumerIfNecessary(); + pausePartitionsIfNecessary(); + this.lastPoll = System.currentTimeMillis(); + if (!isRunning()) { + return; + } + this.polling.set(true); + ConsumerRecords records = doPoll(); // !!! ??? poll here + if (!this.polling.compareAndSet(true, false) && records != null) { + /* + * There is a small race condition where wakeIfNecessaryForStop was called between + * exiting the poll and before we reset the boolean. + */ + if (records.count() > 0) { + this.logger.debug(() -> "Discarding polled records, container stopped: " + records.count()); + } + return; + } + if (!this.firstPoll && this.definedPartitions != null && this.consumerSeekAwareListener != null) { + this.firstPoll = true; + this.consumerSeekAwareListener.onFirstPoll(); + } + if (records != null && records.count() == 0 && this.isCountAck && this.count > 0) { + commitIfNecessary(); + this.count = 0; + } + debugRecords(records); + + invokeIfHaveRecords(records); // !!! here + if (this.remainingRecords == null) { + resumeConsumerIfNeccessary(); + if (!this.consumerPaused) { + resumePartitionsIfNecessary(); + } + } + } + ... + @Nullable + private ConsumerRecords doPoll() { + ConsumerRecords records; + if (this.isBatchListener && this.subBatchPerPartition) { + ... + } + else { + records = pollConsumer(); + if (this.remainingRecords != null) { + int howManyRecords = records.count(); + if (howManyRecords > 0) { + this.logger.error(() -> String.format("Poll returned %d record(s) while consumer was paused " + + "after an error; emergency stop invoked to avoid message loss", howManyRecords)); + KafkaMessageListenerContainer.this.emergencyStop.run(); + } + TopicPartition firstPart = this.remainingRecords.partitions().iterator().next(); + boolean isPaused = isPauseRequested() || isPartitionPauseRequested(firstPart); + this.logger.debug(() -> "First pending after error: " + firstPart + "; paused: " + isPaused); + if (!isPaused) { + records = this.remainingRecords; + this.remainingRecords = null; + } + } + captureOffsets(records); + checkRebalanceCommits(); + } + return records; + } + ... + private ConsumerRecords pollConsumer() { + beforePoll(); + try { + return this.consumer.poll(this.consumerPaused ? this.pollTimeoutWhilePaused : this.pollTimeout); // !!! + } + catch (WakeupException ex) { + return ConsumerRecords.empty(); + } + } + ... + private void debugRecords(@Nullable ConsumerRecords records) { + if (records != null) { + this.logger.debug(() -> "Received: " + records.count() + " records"); // ???? + if (records.count() > 0) { + this.logger.trace(() -> records.partitions().stream() + .flatMap(p -> records.records(p).stream()) + // map to same format as send metadata toString() + .map(r -> r.topic() + "-" + r.partition() + "@" + r.offset()) + .toList() + .toString()); + } + } + } + ... + } + + +org.apache.kafka.clients.consumer.KafkaConsumer + -> debug + ... + private ConsumerRecords poll(Timer timer, boolean includeMetadataInTimeout) { + this.acquireAndEnsureOpen(); + + try { + this.kafkaConsumerMetrics.recordPollStart(timer.currentTimeMs()); + if (this.subscriptions.hasNoSubscriptionOrUserAssignment()) { + throw new IllegalStateException("Consumer is not subscribed to any topics or assigned any partitions"); + } else { + do { + this.client.maybeTriggerWakeup(); + if (includeMetadataInTimeout) { + this.updateAssignmentMetadataIfNeeded(timer, false); + } else { + while(!this.updateAssignmentMetadataIfNeeded(this.time.timer(Long.MAX_VALUE), true)) { + this.log.warn("Still waiting for metadata"); + } + } + + Fetch fetch = this.pollForFetches(timer); // !!! + if (!fetch.isEmpty()) { + if (this.fetcher.sendFetches() > 0 || this.client.hasPendingRequests()) { + this.client.transmitSends(); + } + + if (fetch.records().isEmpty()) { + this.log.trace("Returning empty records from `poll()` since the consumer's position has advanced for at least one topic partition"); + } + + ConsumerRecords var4 = this.interceptors.onConsume(new ConsumerRecords(fetch.records())); // ??? + return var4; + } + } while(timer.notExpired()); + + ConsumerRecords var8 = ConsumerRecords.empty(); + return var8; + } + } finally { + this.release(); + this.kafkaConsumerMetrics.recordPollEnd(timer.currentTimeMs()); + } + } + ... + private Fetch pollForFetches(Timer timer) { + long pollTimeout = coordinator == null ? timer.remainingMs() : + Math.min(coordinator.timeToNextPoll(timer.currentTimeMs()), timer.remainingMs()); + + // if data is available already, return it immediately + final Fetch fetch = fetcher.collectFetch(); + if (!fetch.isEmpty()) { + return fetch; + } + + // send any new fetches (won't resend pending fetches) + fetcher.sendFetches(); // ??? + + // We do not want to be stuck blocking in poll if we are missing some positions + // since the offset lookup may be backing off after a failure + + // NOTE: the use of cachedSubscriptionHasAllFetchPositions means we MUST call + // updateAssignmentMetadataIfNeeded before this method. + if (!cachedSubscriptionHasAllFetchPositions && pollTimeout > retryBackoffMs) { + pollTimeout = retryBackoffMs; + } + + log.trace("Polling for fetches with timeout {}", pollTimeout); + + Timer pollTimer = time.timer(pollTimeout); + client.poll(pollTimer, () -> { + // since a fetch might be completed by the background thread, we need this poll condition + // to ensure that we do not block unnecessarily in poll() + return !fetcher.hasAvailableFetches(); + }); + timer.update(pollTimer.currentTimeMs()); + + return fetcher.collectFetch(); // ??? + } + ... + + +org.apache.kafka.clients.consumer.internals.Fetcher -> trace +org.apache.kafka.clients.consumer.KafkaConsumer -> debug +org.springframework.kafka.listener.KafkaMessageListenerContainer -> debug + + + + + + + + +