Этот коммит содержится в:
Ihar Hancharenka 2025-04-18 13:36:04 +03:00
родитель e2cd0c2c59
Коммит 9535bd4303
2 изменённых файлов: 222 добавлений и 0 удалений

1
net/proto/email/tuta.txt Обычный файл
Просмотреть файл

@ -0,0 +1 @@
https://tuta.com/

Просмотреть файл

@ -0,0 +1,221 @@
org.springframework.kafka.listener.KafkaMessageListenerContainer
private final class ListenerConsumer implements SchedulingAwareRunnable, ConsumerSeekCallback {
...
private final Consumer<K, V> consumer;
...
ListenerConsumer(GenericMessageListener<?> listener, ListenerType listenerType,
ObservationRegistry observationRegistry) {
...
this.consumer =
...
//
// this.containerProperties.getKafkaConsumerProperties(), this.propertyOverrides
//
Properties consumerProperties = propertiesFromConsumerPropertyOverrides();
...
KafkaMessageListenerContainer.this.consumerFactory.createConsumer(
this.consumerGroupId,
this.containerProperties.getClientId(),
KafkaMessageListenerContainer.this.clientIdSuffix,
consumerProperties);
...
}
...
protected void pollAndInvoke() { // !!! no interesting stuff before calling this
doProcessCommits();
fixTxOffsetsIfNeeded();
idleBetweenPollIfNecessary();
if (!this.seeks.isEmpty()) {
processSeeks();
}
enforceRebalanceIfNecessary();
pauseConsumerIfNecessary();
pausePartitionsIfNecessary();
this.lastPoll = System.currentTimeMillis();
if (!isRunning()) {
return;
}
this.polling.set(true);
ConsumerRecords<K, V> records = doPoll(); // !!! ??? poll here
if (!this.polling.compareAndSet(true, false) && records != null) {
/*
* There is a small race condition where wakeIfNecessaryForStop was called between
* exiting the poll and before we reset the boolean.
*/
if (records.count() > 0) {
this.logger.debug(() -> "Discarding polled records, container stopped: " + records.count());
}
return;
}
if (!this.firstPoll && this.definedPartitions != null && this.consumerSeekAwareListener != null) {
this.firstPoll = true;
this.consumerSeekAwareListener.onFirstPoll();
}
if (records != null && records.count() == 0 && this.isCountAck && this.count > 0) {
commitIfNecessary();
this.count = 0;
}
debugRecords(records);
invokeIfHaveRecords(records); // !!! here
if (this.remainingRecords == null) {
resumeConsumerIfNeccessary();
if (!this.consumerPaused) {
resumePartitionsIfNecessary();
}
}
}
...
@Nullable
private ConsumerRecords<K, V> doPoll() {
ConsumerRecords<K, V> records;
if (this.isBatchListener && this.subBatchPerPartition) {
...
}
else {
records = pollConsumer();
if (this.remainingRecords != null) {
int howManyRecords = records.count();
if (howManyRecords > 0) {
this.logger.error(() -> String.format("Poll returned %d record(s) while consumer was paused "
+ "after an error; emergency stop invoked to avoid message loss", howManyRecords));
KafkaMessageListenerContainer.this.emergencyStop.run();
}
TopicPartition firstPart = this.remainingRecords.partitions().iterator().next();
boolean isPaused = isPauseRequested() || isPartitionPauseRequested(firstPart);
this.logger.debug(() -> "First pending after error: " + firstPart + "; paused: " + isPaused);
if (!isPaused) {
records = this.remainingRecords;
this.remainingRecords = null;
}
}
captureOffsets(records);
checkRebalanceCommits();
}
return records;
}
...
private ConsumerRecords<K, V> pollConsumer() {
beforePoll();
try {
return this.consumer.poll(this.consumerPaused ? this.pollTimeoutWhilePaused : this.pollTimeout); // !!!
}
catch (WakeupException ex) {
return ConsumerRecords.empty();
}
}
...
private void debugRecords(@Nullable ConsumerRecords<K, V> records) {
if (records != null) {
this.logger.debug(() -> "Received: " + records.count() + " records"); // ????
if (records.count() > 0) {
this.logger.trace(() -> records.partitions().stream()
.flatMap(p -> records.records(p).stream())
// map to same format as send metadata toString()
.map(r -> r.topic() + "-" + r.partition() + "@" + r.offset())
.toList()
.toString());
}
}
}
...
}
org.apache.kafka.clients.consumer.KafkaConsumer
-> debug
...
private ConsumerRecords<K, V> poll(Timer timer, boolean includeMetadataInTimeout) {
this.acquireAndEnsureOpen();
try {
this.kafkaConsumerMetrics.recordPollStart(timer.currentTimeMs());
if (this.subscriptions.hasNoSubscriptionOrUserAssignment()) {
throw new IllegalStateException("Consumer is not subscribed to any topics or assigned any partitions");
} else {
do {
this.client.maybeTriggerWakeup();
if (includeMetadataInTimeout) {
this.updateAssignmentMetadataIfNeeded(timer, false);
} else {
while(!this.updateAssignmentMetadataIfNeeded(this.time.timer(Long.MAX_VALUE), true)) {
this.log.warn("Still waiting for metadata");
}
}
Fetch<K, V> fetch = this.pollForFetches(timer); // !!!
if (!fetch.isEmpty()) {
if (this.fetcher.sendFetches() > 0 || this.client.hasPendingRequests()) {
this.client.transmitSends();
}
if (fetch.records().isEmpty()) {
this.log.trace("Returning empty records from `poll()` since the consumer's position has advanced for at least one topic partition");
}
ConsumerRecords var4 = this.interceptors.onConsume(new ConsumerRecords(fetch.records())); // ???
return var4;
}
} while(timer.notExpired());
ConsumerRecords var8 = ConsumerRecords.empty();
return var8;
}
} finally {
this.release();
this.kafkaConsumerMetrics.recordPollEnd(timer.currentTimeMs());
}
}
...
private Fetch<K, V> pollForFetches(Timer timer) {
long pollTimeout = coordinator == null ? timer.remainingMs() :
Math.min(coordinator.timeToNextPoll(timer.currentTimeMs()), timer.remainingMs());
// if data is available already, return it immediately
final Fetch<K, V> fetch = fetcher.collectFetch();
if (!fetch.isEmpty()) {
return fetch;
}
// send any new fetches (won't resend pending fetches)
fetcher.sendFetches(); // ???
// We do not want to be stuck blocking in poll if we are missing some positions
// since the offset lookup may be backing off after a failure
// NOTE: the use of cachedSubscriptionHasAllFetchPositions means we MUST call
// updateAssignmentMetadataIfNeeded before this method.
if (!cachedSubscriptionHasAllFetchPositions && pollTimeout > retryBackoffMs) {
pollTimeout = retryBackoffMs;
}
log.trace("Polling for fetches with timeout {}", pollTimeout);
Timer pollTimer = time.timer(pollTimeout);
client.poll(pollTimer, () -> {
// since a fetch might be completed by the background thread, we need this poll condition
// to ensure that we do not block unnecessarily in poll()
return !fetcher.hasAvailableFetches();
});
timer.update(pollTimer.currentTimeMs());
return fetcher.collectFetch(); // ???
}
...
org.apache.kafka.clients.consumer.internals.Fetcher -> trace
org.apache.kafka.clients.consumer.KafkaConsumer -> debug
org.springframework.kafka.listener.KafkaMessageListenerContainer -> debug
<!-- logger name="org.apache.kafka.clients.consumer.internals.Fetcher" level="TRACE" additivity="false">
<appender-ref ref="STDOUT" />
</logger-->
<logger name="org.apache.kafka.clients.consumer.KafkaConsumer" level="DEBUG" additivity="false">
<appender-ref ref="STDOUT" />
</logger>
<logger name="org.springframework.kafka.listener.KafkaMessageListenerContainer" level="DEBUG" additivity="false">
<appender-ref ref="STDOUT" />
</logger>