Ihar Hancharenka 5dff80e88e first
2023-03-27 16:52:17 +03:00

475 строки
19 KiB
Plaintext

2021
https://www.linkedin.com/learning/instructors/kathy-flint?u=2113185
https://www.linkedin.com/learning/spring-spring-integration
! 1h23m
https://www.linkedin.com/learning/spring-spring-integration/what-can-spring-integration-do
! spring integration enables light-weight messaging within spring app
https://www.linkedin.com/learning/spring-spring-integration/using-the-exercise-files
https://github.com/kdflint/spring-spring-integration-2848254
https://www.linkedin.com/learning/spring-spring-integration/essential-spring-integration-components
https://www.enterpriseintegrationpatterns.com
1. message itself
2. message channel (conduit, pathway, ...)
3. endpoint
https://www.linkedin.com/learning/spring-spring-integration/set-up-a-spring-integration-project
https://www.linkedin.com/learning/spring-spring-integration/create-a-message-in-spring-integration
message can have meta-info
Message<T>
getHeaders()
MessageHeader(Map)
getId()
get()
contains()
...
getPayload()
<T>
while instantiated, message is immutable
org.springframework.messaging.support.MessageBuilder<T>
02_01b
...
AppSupportStatus status = new AppSupportStatus(
props.getRuntimeProperties().getProperty("software.build"), new Date);
// fromMessage
// createMessage
// withPayload
GenericMessage message = (GenericMessage)MessageBuilder.withPayload(status)
// .setBlaBla()
.build()
https://www.linkedin.com/learning/spring-spring-integration/create-a-channel-in-spring-integration
Producer
Consumer
Channel
conduit ... between producers and consumers
i-face
org.springframework.messaging.MessageChannel
SubscribableChannel
subscribe(MessageHandler)
PollableChannel
receive()
MessageChannel guarantees that every channel has a capacity to send messages
Both Subscribable/Pollaber guarantees capacity to deliver messages
02_02b
<int:channel id="techSupportChannel"/>
// DirectChannel - single-threaded communication pipe
AbstractSubscribableChannel techSupportChannel = (DirectChannel) ... get from spring context "techSupportChannel"
techSupportChannel.send(message);
...
at distinct message
AbstractSubscribableChannel techSupportChannel = (DirectChannel) ... get from spring context "techSupportChannel"
techSupportChannel.subscribe(new ViewMessageHandler());
https://www.linkedin.com/learning/spring-spring-integration/choose-the-right-channel-for-the-job
Subscribable
Pollable
Queue
List of messages that are stored to be retrievable in a definite order
Channel as EIP (connective tissue of a messaging system)
An admin must configure a messaging system with the channels
that define the paths of communication between the applications
DirectChannel
single-subscriber recepient, single-threaded behaviour
PublishSubscribeChannel
broadcast to all subscribers
FixedSubscriberChannel
one subscriber, unsubscribeable
QueueChannel
pollable, FIFO, single-consumer recepient, configurable blocking behaviour
PriorityChannel
pollable, prioritized, single-consumer recepient, configurable blocking behaviour
RendezvousChannel
Pollable, sender blocks untill consumer request, synchronous behaviour
ExecutorChannel
Pollable, single consumer recepient, delegated dispatch, asynchronous
FluxChannel
ReactiveStreamsSubscribable, org.reactivestreams.Publisher, Allows reactive consumption
02_03b
META_INF.spring/integration/tech-support.xml
<int:channel id="updateNotificationQueueChannel">
<int:queue capacity="5"/>
</int:channel>
public class TechSupportService {
private Timer = new Timer();
private AbstractSubscribableChannel techSupportChannel;
private PollableChannel updateNotificationChannel;
public TechSupportService() {
updateNotificationChannel = (PollableChannel)
DashboardManager.getDashboard().getBean("updateNotificationQueueChannel")
this.start();
}
private void start() {
timer.schedule(new TimerTask(){
public void run() {
checkVersionCurrency();
}
}, 10000, 10000);
}
private void checkVersionCurrency() {
updateNotificationChannel.send(
MessageBuilder.withPayload("app update required").build(), 1000);
}
}
public class InterviewService {
...
private void start() {
timer.schedule(() -> { checkForNotifications() }, 3000, 3000);
}
private void checkForNotifications() {
Message msg = updateNotificationChannel.receive(1000);
if (msg != null) {
... with msg.getPayload().toString()
}
}
}
https://www.linkedin.com/learning/spring-spring-integration/challenge-refactor-a-spring-integration-subscribablechannel
02_04b
https://www.linkedin.com/learning/spring-spring-integration/solution-refactor-a-spring-integration-subscribablechannel
public class ViewService {
private AbstractSubscribeChannel techSupportChannel;
public ViewSource() {
techSupportChannel = (PublishSubscribeChannel) ... getBean(..).;
techSupportChannel.subscribe(new ViewMessagehandler());
}
...
}
public class DashboardManager() {
...
private void initializeDashboard() {
AbstractSubscribableChannel techSupportChannel = (PublishSubscribeChannel) ... getBean(...);
}
}
public class TechSupportService() {
...
public TechSupportService() {
techSupportChannel = (PublishSubscribeChannel) ... getBean(...);
techSupportChannel.subscribe(new ServiceMessageHandler());
}
}
Endpoint as EIP
Producer|Consumer - core IPs
https://www.linkedin.com/learning/spring-spring-integration/capabilities-of-a-spring-integration-endpoint
Foundational Concepts
Endpoint
* Conceptual interface between the messaging framework (channels)
and core application functionality
* Component, or collection of components, that
manipulate a message and make it consumable by
the overall business process
* A very broad descriptor of behaviour
Endpoint
* Component, or collection of components, that
manages message delivery in a way that
meets the design needs of the application
Handler
* is a lightweight interface that defines
a simple contract for handling a message
...
Channel -> <<Handler>> -> Application Component
...
interface org.springframework.messaging.MessageHandler
handleMessage()
Filter
* decide whether a message should be passed along
or dropped based on some criteria.
filter is constructed by injection of message selector
with filtering logic
MessageFilter (implements MessageHandler)
has a Selector injected
org.springframework.integration.filter package
Transform
* modify a message so that it can meet
the contractual (type) expectations of a consumer
MessageTransformingHandler (implements MessageHandler)
has a Transformer injected
org.springframework.integration.transformation package
Route
* consume messages from a message channel
and forward to one or more different message channels,
depending on a set of conditions
AbstractMessageRouter (implements MessageHandler)
org.springframework.integration.router package
Split/Aggregate
* partition a message into several parts or
combine multiple messages into a single message,
sending the resulting messages
to be processed independently
AbstractMessageSplitter (implements MessageHandler)
AbstractCorrelatingMessageHandler (implements MessageHandler)
org.springframework.integration.??? package
Service Activator Pattern
* a Service Activator is both an EI pattern
and a specific Spring Integration Implementation
* Service Activators connect to the messages on a channel
to the service being accessed
Adapter Pattern
* Allows the interface of an existing class
to be used as a different interface
Gateway Pattern
* encapsulates access to an external system or resource,
separating the messaging framework from application code
3.2
https://www.linkedin.com/learning/spring-spring-integration/filter-spring-integration-messages
03_02b
<int:publish-subscribe-channel id="textSupportChannel"/>
<int:channel id="updateNotificationQueueChannel">
<int:queue capacity="5"/>
</int:channel>
<int:filter input-channel="textSupportChannel"
output-channel="updateNotificationQueueChannel"
reference="selector" />
<beans:bean id="selector" class="...StatusMonitorService.ServiceMessageFilter"/>
...
public static class ServiceMessageFilter extends TechSupportMessageFilter {
protected boolean filterMessage(AppSupportStatus status) {
return status.isUpdateRequired();
}
}
...
public abstract Class TechSupportMessageFilter implements MessageSelector {
@Overrride
public boolean accept(Message<?> message) throws MessagingException {
Object payload = message.getPayload();
if (payload instanceof AppSupportStatus) {
return filterMessage((AppSupportStatus) payload);
} else {
throw new MessageRejectingException(message, "Unknown data type has been received.");
}
}
protected abstract boolean filterMessage(AppSupportStatus status);
}
3.3
https://www.linkedin.com/learning/spring-spring-integration/transform-spring-integration-messages
03_03b
private void checkClientStatus() {
...
String rawJson = simulateRestApiCall();
System.out.println(rawJson)
...
}
<int:channel id="rawApiJsonChannel" />
<int:json-to-object-transformer input-channel="rawApiJsonChannel"
output-channel="statusMonitorChannel"
type="com.lil.springintegration.util.AppSupportStatus"/>
<int:publish-subscribe-channel id="statusMonitorChannel"/>
...
private AppSupportStatus currentLocalStatus;
private AbstractSubscribableChannel statusMonitorChannel;
private QueueChannel updateNotificationQueueChannel;
private DirectChannel apiInputChannel;
public StatusMonitorService() {
updateNotificationQueueChannel = (QueueChannel) DashboardManager.getDashboardContext().getBean("updateNotificationQueueChannel");
apiInputChannel = (DirectChannel) DashboardManager.getDashboardContext().getBean("rawApiJsonChannel");
statusMonitorChannel = (PublishSubscribeChannel) DashboardManager.getDashboardContext().getBean("statusMonitorChannel");
statusMonitorChannel.subscribe(new ServiceMessageHandler);
this.start();
}
private void start() {
}
...
{
...
apiInputChannel.send(MessageBuilder.withPayload(rawJson).build());
}
https://www.linkedin.com/learning/spring-spring-integration/challenge-configure-a-service-activator
https://www.linkedin.com/learning/spring-spring-integration/solution-configure-a-service-activator
03_04b
<int:service-activator input-channel="statusMonitorChannel"
ref="accountCreditService"
method="creditCustomerAccount" />
https://www.linkedin.com/learning/spring-spring-integration/using-spring-integration-to-connect-with-external-systems
Adapter Pattern
Allows the interface of an existing class
to be used as a different interface
Gateway Pattern
Encapsulates access to an external system or resource,
separating the messaging framework from application code
Adpter
One way (support directed data flow)
Enables connecting a single consumer to a message channel
In = Send end of the cnannel
Out = Receive end of the channel
Gateway
Bidirectional
Hides the Spring integration messaging API
from the application business logic
In = Server side of the interaction
Out = Client side of the interaction
https://www.linkedin.com/learning/spring-spring-integration/jdbc-connectivity-using-spring-integration
04_02b
AppSupportStatus.java - our Domain Object
15: private ArrayList<LinkedCaseInsensitiveMap> deviceOut = new ArrayList();
jdbc.xml
<int:channel id="dataChannel" />
<--
int-jdbc:outbound-channel-adapter - unidirectional
outbound-gateway - bidirectional
-->
<int-jdbc:inbound-channel-adapter id="gridStatusPoller"
channel="dataChannel"
query="select display from device where isup=false"
data-source="dataSource" // at application.xml
auto-startup=false>
<int:poller fixed-delay="5000">
<!-- int:transactional -->
</int:poller>
</int-jdbc:inbound-channel-adapter>
<int:transformer input-channel="dataChannel"
output-channel="statusMonitorChannel"
method="transform"
ref="transformer" />
<beans:bean id="transformer" class="com.lil.springintegration.endpoint.JdbcMessageTransformer" />
https://www.linkedin.com/learning/spring-spring-integration/integrate-twitter-using-spring-integration
04_03b
twitter.xml
<int-twitter:search-inbound-channel-adapter id="twitterPoller"
twitter-template="twitterAuth"
query="#sustainability #kineteconews"
channel="updateNotificationQueueChannel"
auto-startup="false">
<int:poller fixed-rate="7000" max-messages-per-poll="1"/>
</int-twitter:search-inbound-channel-adapter>
<bean:bean id="twitterAuth" class"org.springframework.social.twitter.api.impl.TwitterTemaplate...">
<beans:constructor-arg value="${twitter.oauth.consumerKey}"/>
<beans:constructor-arg value="${twitter.oauth.consumerSecret}"/>
<beans:constructor-arg value="${twitter.oauth.accessToken}"/>
<beans:constructor-arg value="${twitter.oauth.accessTokenSecret}"/>
</bean:bean>
application.xml
<import resource="integration/twitter.xml" />
ViewService.java
...
private void checkForNotifications() {
GenericMessage<?> message = (GenericMessage<?>)updateNotificationChannel.receive(...);
if (message != null) {
if (message.getPayload() instanceof AppSupportStatus) {
AppSupportStatus payload = (AppSupportStatus) message.getPayload();
DashboardManager.setDashboardStatus("softwareNotification", payload.getCustomerSof...);
DashboardManager.setDashboardStatus("deviceNotification", payload.getCustomerDevic...);
} if (message.getPayload() instanceof Tweet) {
Tweet payload = (Tweet) message.getPayload();
System.out.println("News! " + payload.getText());
DashboardManager.setDashboardStatus("latestTweeets", payload.getText());
}
}
}
https://www.linkedin.com/learning/spring-spring-integration/call-a-rest-api-using-spring-integration
04_04b
...
StatusMonitorService {
...
private void start() {
timer.schedule(() -> { checkClientStatus(); }, 1000, 1000);
}
private void checkClientStatus() {
String rawJson = simulateRestApiCall();
apiInputChannel.send(MessageBuilder
.withPayload(rawJson)
.build());
}
...
}
http-api.xml
<int:channel id="pollingSignalChannel"/>
<int:inbound-channel-adapter id="apiPoller"
channel="pollingSignalChannel"
expression="''"
auto-startup="false">
<int:poller fixed-rate="3000"/>
<int:inbound-channel-adapter>
<int-http:outbound-gateway id="apiGateway"
reply-channel="apiInputChannel"
reply-timeout="5000"
expected-response-type="java.lang.String"
url="http://localhost:9090/api"
http-method="GET"
request-channel="pollingSignalChannel" />
https://www.linkedin.com/learning/spring-spring-integration/advanced-spring-integration