Ihar Hancharenka 5dff80e88e first
2023-03-27 16:52:17 +03:00

95 строки
4.4 KiB
Plaintext

https://docs.spring.io/spring-cloud-stream/docs/current/reference/html/spring-cloud-stream.html#spring_cloud_function
https://docs.spring.io/spring-cloud-stream/docs/current/reference/html/spring-cloud-stream.html#_functional_binding_names
<functionName> + -<in|out>- + <index>
https://docs.spring.io/spring-cloud-stream/docs/current/reference/html/spring-cloud-stream.html#binding-properties
org.springframework.cloud.stream.config.BindingProperties
org.springframework.cloud.stream.binder.ConsumerProperties
https://docs.spring.io/spring-cloud-stream/docs/current/reference/html/spring-cloud-stream.html#_advanced_consumer_configuration
ListenerContainerCustomizer
MessageSourceCustomizer
org.springframework.cloud.stream.binder.ProducerProperties
https://docs.spring.io/spring-cloud-stream/docs/current/reference/html/spring-cloud-stream.html#_advanced_producer_configuration
ProducerMessageHandlerCustomizer
???
https://docs.spring.io/spring-cloud-stream/docs/current/reference/html/spring-cloud-stream.html#_using_channel_interceptors_with_streambridge
https://github.com/spring-cloud/spring-cloud-stream-samples
https://github.com/spring-cloud/spring-cloud-stream-binder-rabbit
https://github.com/spring-cloud/spring-cloud-stream
https://stackoverflow.com/questions/65576021/enablebinding-deprecated-as-of-3-1-in-favor-of-functional-programming-model
2019
https://spring.io/blog/2019/10/14/spring-cloud-stream-demystified-and-simplified
package org.springframework.cloud.function.context.catalog;
public class SimpleFunctionRegistry implements FunctionRegistry, FunctionInspector {
...
private Object enrichInvocationResultIfNecessary(Object input, Object result) {
if (result != null && !(result instanceof Publisher) && input instanceof Message) {
if (result instanceof Message) {
if (functionInvocationHelper != null && CloudEventMessageUtils.isCloudEvent(((Message) input))) {
result = functionInvocationHelper.postProcessResult(result, (Message) input);
}
else {
Map<String, Object> headersMap = (Map<String, Object>) ReflectionUtils
.getField(SimpleFunctionRegistry.this.headersField, ((Message) result).getHeaders());
this.sanitizeHeaders(((Message) input).getHeaders()).forEach((k, v) -> headersMap.putIfAbsent(k, v));
}
}
else {
if (functionInvocationHelper != null && CloudEventMessageUtils.isCloudEvent(((Message) input))) {
result = functionInvocationHelper.postProcessResult(result, (Message) input);
}
else if (!FunctionTypeUtils.isCollectionOfMessage(this.outputType)) {
result = MessageBuilder.withPayload(result).copyHeaders(this.sanitizeHeaders(((Message) input).getHeaders())).build(); // !!!
}
}
}
return result;
}
...
}
package org.springframework.cloud.stream.function;
...
public class FunctionConfiguration {
...
@Override
public void afterPropertiesSet() throws Exception {
Map<String, BindableProxyFactory> beansOfType = applicationContext.getBeansOfType(BindableProxyFactory.class);
this.bindableProxyFactories = beansOfType.values().toArray(new BindableProxyFactory[0]);
for (BindableProxyFactory bindableProxyFactory : this.bindableProxyFactories) {
String functionDefinition = bindableProxyFactory instanceof BindableFunctionProxyFactory
? ((BindableFunctionProxyFactory) bindableProxyFactory).getFunctionDefinition()
: this.functionProperties.getDefinition();
boolean shouldNotProcess = false;
if (!(bindableProxyFactory instanceof BindableFunctionProxyFactory)) {
Set<String> outputBindingNames = bindableProxyFactory.getOutputs();
shouldNotProcess = !CollectionUtils.isEmpty(outputBindingNames)
&& outputBindingNames.iterator().next().equals("applicationMetrics");
}
if (StringUtils.hasText(functionDefinition) && !shouldNotProcess) {
FunctionInvocationWrapper function = functionCatalog.lookup(functionDefinition);
if (function != null && !function.isSupplier()) {
this.bindFunctionToDestinations(bindableProxyFactory, functionDefinition);
}
}
}
}
// !!!
private void bindFunctionToDestinations(BindableProxyFactory bindableProxyFactory, String functionDefinition) {
this.assertBindingIsPossible(bindableProxyFactory);
Set<String> inputBindingNames = bindableProxyFactory.getInputs();
Set<String> outputBindingNames = bindableProxyFactory.getOutputs();
...
}
...
}