зеркало из
https://github.com/iharh/notes.git
synced 2025-11-01 14:16:09 +02:00
475 строки
19 KiB
Plaintext
475 строки
19 KiB
Plaintext
2021
|
|
https://www.linkedin.com/learning/instructors/kathy-flint?u=2113185
|
|
https://www.linkedin.com/learning/spring-spring-integration
|
|
! 1h23m
|
|
|
|
https://www.linkedin.com/learning/spring-spring-integration/what-can-spring-integration-do
|
|
! spring integration enables light-weight messaging within spring app
|
|
|
|
https://www.linkedin.com/learning/spring-spring-integration/using-the-exercise-files
|
|
https://github.com/kdflint/spring-spring-integration-2848254
|
|
|
|
https://www.linkedin.com/learning/spring-spring-integration/essential-spring-integration-components
|
|
https://www.enterpriseintegrationpatterns.com
|
|
1. message itself
|
|
2. message channel (conduit, pathway, ...)
|
|
3. endpoint
|
|
|
|
https://www.linkedin.com/learning/spring-spring-integration/set-up-a-spring-integration-project
|
|
|
|
https://www.linkedin.com/learning/spring-spring-integration/create-a-message-in-spring-integration
|
|
message can have meta-info
|
|
Message<T>
|
|
getHeaders()
|
|
MessageHeader(Map)
|
|
getId()
|
|
get()
|
|
contains()
|
|
...
|
|
getPayload()
|
|
<T>
|
|
|
|
while instantiated, message is immutable
|
|
|
|
org.springframework.messaging.support.MessageBuilder<T>
|
|
|
|
02_01b
|
|
...
|
|
AppSupportStatus status = new AppSupportStatus(
|
|
props.getRuntimeProperties().getProperty("software.build"), new Date);
|
|
|
|
// fromMessage
|
|
// createMessage
|
|
// withPayload
|
|
GenericMessage message = (GenericMessage)MessageBuilder.withPayload(status)
|
|
// .setBlaBla()
|
|
.build()
|
|
|
|
https://www.linkedin.com/learning/spring-spring-integration/create-a-channel-in-spring-integration
|
|
Producer
|
|
Consumer
|
|
Channel
|
|
conduit ... between producers and consumers
|
|
i-face
|
|
org.springframework.messaging.MessageChannel
|
|
SubscribableChannel
|
|
subscribe(MessageHandler)
|
|
PollableChannel
|
|
receive()
|
|
|
|
MessageChannel guarantees that every channel has a capacity to send messages
|
|
Both Subscribable/Pollaber guarantees capacity to deliver messages
|
|
|
|
02_02b
|
|
|
|
<int:channel id="techSupportChannel"/>
|
|
|
|
// DirectChannel - single-threaded communication pipe
|
|
AbstractSubscribableChannel techSupportChannel = (DirectChannel) ... get from spring context "techSupportChannel"
|
|
techSupportChannel.send(message);
|
|
|
|
...
|
|
at distinct message
|
|
|
|
AbstractSubscribableChannel techSupportChannel = (DirectChannel) ... get from spring context "techSupportChannel"
|
|
techSupportChannel.subscribe(new ViewMessageHandler());
|
|
|
|
https://www.linkedin.com/learning/spring-spring-integration/choose-the-right-channel-for-the-job
|
|
Subscribable
|
|
Pollable
|
|
Queue
|
|
List of messages that are stored to be retrievable in a definite order
|
|
|
|
Channel as EIP (connective tissue of a messaging system)
|
|
An admin must configure a messaging system with the channels
|
|
that define the paths of communication between the applications
|
|
|
|
DirectChannel
|
|
single-subscriber recepient, single-threaded behaviour
|
|
PublishSubscribeChannel
|
|
broadcast to all subscribers
|
|
FixedSubscriberChannel
|
|
one subscriber, unsubscribeable
|
|
QueueChannel
|
|
pollable, FIFO, single-consumer recepient, configurable blocking behaviour
|
|
PriorityChannel
|
|
pollable, prioritized, single-consumer recepient, configurable blocking behaviour
|
|
RendezvousChannel
|
|
Pollable, sender blocks untill consumer request, synchronous behaviour
|
|
ExecutorChannel
|
|
Pollable, single consumer recepient, delegated dispatch, asynchronous
|
|
FluxChannel
|
|
ReactiveStreamsSubscribable, org.reactivestreams.Publisher, Allows reactive consumption
|
|
|
|
02_03b
|
|
|
|
META_INF.spring/integration/tech-support.xml
|
|
<int:channel id="updateNotificationQueueChannel">
|
|
<int:queue capacity="5"/>
|
|
</int:channel>
|
|
|
|
public class TechSupportService {
|
|
private Timer = new Timer();
|
|
private AbstractSubscribableChannel techSupportChannel;
|
|
private PollableChannel updateNotificationChannel;
|
|
|
|
public TechSupportService() {
|
|
updateNotificationChannel = (PollableChannel)
|
|
DashboardManager.getDashboard().getBean("updateNotificationQueueChannel")
|
|
this.start();
|
|
}
|
|
private void start() {
|
|
timer.schedule(new TimerTask(){
|
|
public void run() {
|
|
checkVersionCurrency();
|
|
}
|
|
}, 10000, 10000);
|
|
}
|
|
private void checkVersionCurrency() {
|
|
updateNotificationChannel.send(
|
|
MessageBuilder.withPayload("app update required").build(), 1000);
|
|
}
|
|
}
|
|
|
|
public class InterviewService {
|
|
...
|
|
private void start() {
|
|
timer.schedule(() -> { checkForNotifications() }, 3000, 3000);
|
|
}
|
|
private void checkForNotifications() {
|
|
Message msg = updateNotificationChannel.receive(1000);
|
|
if (msg != null) {
|
|
... with msg.getPayload().toString()
|
|
}
|
|
}
|
|
}
|
|
|
|
https://www.linkedin.com/learning/spring-spring-integration/challenge-refactor-a-spring-integration-subscribablechannel
|
|
02_04b
|
|
|
|
https://www.linkedin.com/learning/spring-spring-integration/solution-refactor-a-spring-integration-subscribablechannel
|
|
|
|
public class ViewService {
|
|
private AbstractSubscribeChannel techSupportChannel;
|
|
|
|
public ViewSource() {
|
|
techSupportChannel = (PublishSubscribeChannel) ... getBean(..).;
|
|
techSupportChannel.subscribe(new ViewMessagehandler());
|
|
}
|
|
...
|
|
}
|
|
|
|
public class DashboardManager() {
|
|
...
|
|
private void initializeDashboard() {
|
|
AbstractSubscribableChannel techSupportChannel = (PublishSubscribeChannel) ... getBean(...);
|
|
}
|
|
}
|
|
|
|
public class TechSupportService() {
|
|
...
|
|
public TechSupportService() {
|
|
techSupportChannel = (PublishSubscribeChannel) ... getBean(...);
|
|
techSupportChannel.subscribe(new ServiceMessageHandler());
|
|
|
|
}
|
|
}
|
|
|
|
Endpoint as EIP
|
|
Producer|Consumer - core IPs
|
|
|
|
https://www.linkedin.com/learning/spring-spring-integration/capabilities-of-a-spring-integration-endpoint
|
|
Foundational Concepts
|
|
Endpoint
|
|
* Conceptual interface between the messaging framework (channels)
|
|
and core application functionality
|
|
* Component, or collection of components, that
|
|
manipulate a message and make it consumable by
|
|
the overall business process
|
|
* A very broad descriptor of behaviour
|
|
|
|
Endpoint
|
|
* Component, or collection of components, that
|
|
manages message delivery in a way that
|
|
meets the design needs of the application
|
|
|
|
Handler
|
|
* is a lightweight interface that defines
|
|
a simple contract for handling a message
|
|
|
|
...
|
|
Channel -> <<Handler>> -> Application Component
|
|
...
|
|
|
|
interface org.springframework.messaging.MessageHandler
|
|
handleMessage()
|
|
|
|
Filter
|
|
* decide whether a message should be passed along
|
|
or dropped based on some criteria.
|
|
|
|
filter is constructed by injection of message selector
|
|
with filtering logic
|
|
|
|
MessageFilter (implements MessageHandler)
|
|
has a Selector injected
|
|
|
|
org.springframework.integration.filter package
|
|
|
|
Transform
|
|
* modify a message so that it can meet
|
|
the contractual (type) expectations of a consumer
|
|
|
|
MessageTransformingHandler (implements MessageHandler)
|
|
has a Transformer injected
|
|
|
|
org.springframework.integration.transformation package
|
|
|
|
Route
|
|
* consume messages from a message channel
|
|
and forward to one or more different message channels,
|
|
depending on a set of conditions
|
|
|
|
AbstractMessageRouter (implements MessageHandler)
|
|
|
|
org.springframework.integration.router package
|
|
|
|
Split/Aggregate
|
|
* partition a message into several parts or
|
|
combine multiple messages into a single message,
|
|
sending the resulting messages
|
|
to be processed independently
|
|
|
|
AbstractMessageSplitter (implements MessageHandler)
|
|
AbstractCorrelatingMessageHandler (implements MessageHandler)
|
|
|
|
org.springframework.integration.??? package
|
|
|
|
|
|
Service Activator Pattern
|
|
* a Service Activator is both an EI pattern
|
|
and a specific Spring Integration Implementation
|
|
|
|
* Service Activators connect to the messages on a channel
|
|
to the service being accessed
|
|
|
|
Adapter Pattern
|
|
* Allows the interface of an existing class
|
|
to be used as a different interface
|
|
|
|
Gateway Pattern
|
|
* encapsulates access to an external system or resource,
|
|
separating the messaging framework from application code
|
|
|
|
3.2
|
|
https://www.linkedin.com/learning/spring-spring-integration/filter-spring-integration-messages
|
|
03_02b
|
|
|
|
<int:publish-subscribe-channel id="textSupportChannel"/>
|
|
<int:channel id="updateNotificationQueueChannel">
|
|
<int:queue capacity="5"/>
|
|
</int:channel>
|
|
<int:filter input-channel="textSupportChannel"
|
|
output-channel="updateNotificationQueueChannel"
|
|
reference="selector" />
|
|
<beans:bean id="selector" class="...StatusMonitorService.ServiceMessageFilter"/>
|
|
|
|
...
|
|
public static class ServiceMessageFilter extends TechSupportMessageFilter {
|
|
protected boolean filterMessage(AppSupportStatus status) {
|
|
return status.isUpdateRequired();
|
|
}
|
|
}
|
|
...
|
|
public abstract Class TechSupportMessageFilter implements MessageSelector {
|
|
@Overrride
|
|
public boolean accept(Message<?> message) throws MessagingException {
|
|
Object payload = message.getPayload();
|
|
if (payload instanceof AppSupportStatus) {
|
|
return filterMessage((AppSupportStatus) payload);
|
|
} else {
|
|
throw new MessageRejectingException(message, "Unknown data type has been received.");
|
|
}
|
|
}
|
|
protected abstract boolean filterMessage(AppSupportStatus status);
|
|
}
|
|
|
|
3.3
|
|
https://www.linkedin.com/learning/spring-spring-integration/transform-spring-integration-messages
|
|
03_03b
|
|
|
|
private void checkClientStatus() {
|
|
...
|
|
String rawJson = simulateRestApiCall();
|
|
System.out.println(rawJson)
|
|
...
|
|
}
|
|
|
|
<int:channel id="rawApiJsonChannel" />
|
|
<int:json-to-object-transformer input-channel="rawApiJsonChannel"
|
|
output-channel="statusMonitorChannel"
|
|
type="com.lil.springintegration.util.AppSupportStatus"/>
|
|
<int:publish-subscribe-channel id="statusMonitorChannel"/>
|
|
|
|
...
|
|
private AppSupportStatus currentLocalStatus;
|
|
|
|
private AbstractSubscribableChannel statusMonitorChannel;
|
|
private QueueChannel updateNotificationQueueChannel;
|
|
private DirectChannel apiInputChannel;
|
|
|
|
public StatusMonitorService() {
|
|
updateNotificationQueueChannel = (QueueChannel) DashboardManager.getDashboardContext().getBean("updateNotificationQueueChannel");
|
|
apiInputChannel = (DirectChannel) DashboardManager.getDashboardContext().getBean("rawApiJsonChannel");
|
|
statusMonitorChannel = (PublishSubscribeChannel) DashboardManager.getDashboardContext().getBean("statusMonitorChannel");
|
|
statusMonitorChannel.subscribe(new ServiceMessageHandler);
|
|
this.start();
|
|
}
|
|
|
|
private void start() {
|
|
}
|
|
...
|
|
|
|
{
|
|
...
|
|
apiInputChannel.send(MessageBuilder.withPayload(rawJson).build());
|
|
}
|
|
|
|
https://www.linkedin.com/learning/spring-spring-integration/challenge-configure-a-service-activator
|
|
|
|
https://www.linkedin.com/learning/spring-spring-integration/solution-configure-a-service-activator
|
|
03_04b
|
|
|
|
<int:service-activator input-channel="statusMonitorChannel"
|
|
ref="accountCreditService"
|
|
method="creditCustomerAccount" />
|
|
|
|
https://www.linkedin.com/learning/spring-spring-integration/using-spring-integration-to-connect-with-external-systems
|
|
|
|
Adapter Pattern
|
|
Allows the interface of an existing class
|
|
to be used as a different interface
|
|
|
|
Gateway Pattern
|
|
Encapsulates access to an external system or resource,
|
|
separating the messaging framework from application code
|
|
|
|
Adpter
|
|
One way (support directed data flow)
|
|
Enables connecting a single consumer to a message channel
|
|
In = Send end of the cnannel
|
|
Out = Receive end of the channel
|
|
|
|
Gateway
|
|
Bidirectional
|
|
Hides the Spring integration messaging API
|
|
from the application business logic
|
|
In = Server side of the interaction
|
|
Out = Client side of the interaction
|
|
|
|
https://www.linkedin.com/learning/spring-spring-integration/jdbc-connectivity-using-spring-integration
|
|
04_02b
|
|
|
|
AppSupportStatus.java - our Domain Object
|
|
15: private ArrayList<LinkedCaseInsensitiveMap> deviceOut = new ArrayList();
|
|
|
|
jdbc.xml
|
|
<int:channel id="dataChannel" />
|
|
<--
|
|
int-jdbc:outbound-channel-adapter - unidirectional
|
|
outbound-gateway - bidirectional
|
|
-->
|
|
<int-jdbc:inbound-channel-adapter id="gridStatusPoller"
|
|
channel="dataChannel"
|
|
query="select display from device where isup=false"
|
|
data-source="dataSource" // at application.xml
|
|
auto-startup=false>
|
|
<int:poller fixed-delay="5000">
|
|
<!-- int:transactional -->
|
|
</int:poller>
|
|
</int-jdbc:inbound-channel-adapter>
|
|
|
|
<int:transformer input-channel="dataChannel"
|
|
output-channel="statusMonitorChannel"
|
|
method="transform"
|
|
ref="transformer" />
|
|
<beans:bean id="transformer" class="com.lil.springintegration.endpoint.JdbcMessageTransformer" />
|
|
|
|
https://www.linkedin.com/learning/spring-spring-integration/integrate-twitter-using-spring-integration
|
|
04_03b
|
|
|
|
twitter.xml
|
|
<int-twitter:search-inbound-channel-adapter id="twitterPoller"
|
|
twitter-template="twitterAuth"
|
|
query="#sustainability #kineteconews"
|
|
channel="updateNotificationQueueChannel"
|
|
auto-startup="false">
|
|
<int:poller fixed-rate="7000" max-messages-per-poll="1"/>
|
|
</int-twitter:search-inbound-channel-adapter>
|
|
|
|
|
|
<bean:bean id="twitterAuth" class"org.springframework.social.twitter.api.impl.TwitterTemaplate...">
|
|
<beans:constructor-arg value="${twitter.oauth.consumerKey}"/>
|
|
<beans:constructor-arg value="${twitter.oauth.consumerSecret}"/>
|
|
<beans:constructor-arg value="${twitter.oauth.accessToken}"/>
|
|
<beans:constructor-arg value="${twitter.oauth.accessTokenSecret}"/>
|
|
</bean:bean>
|
|
|
|
application.xml
|
|
<import resource="integration/twitter.xml" />
|
|
|
|
ViewService.java
|
|
...
|
|
private void checkForNotifications() {
|
|
GenericMessage<?> message = (GenericMessage<?>)updateNotificationChannel.receive(...);
|
|
if (message != null) {
|
|
if (message.getPayload() instanceof AppSupportStatus) {
|
|
AppSupportStatus payload = (AppSupportStatus) message.getPayload();
|
|
DashboardManager.setDashboardStatus("softwareNotification", payload.getCustomerSof...);
|
|
DashboardManager.setDashboardStatus("deviceNotification", payload.getCustomerDevic...);
|
|
} if (message.getPayload() instanceof Tweet) {
|
|
Tweet payload = (Tweet) message.getPayload();
|
|
System.out.println("News! " + payload.getText());
|
|
DashboardManager.setDashboardStatus("latestTweeets", payload.getText());
|
|
}
|
|
}
|
|
}
|
|
|
|
https://www.linkedin.com/learning/spring-spring-integration/call-a-rest-api-using-spring-integration
|
|
04_04b
|
|
...
|
|
StatusMonitorService {
|
|
...
|
|
private void start() {
|
|
timer.schedule(() -> { checkClientStatus(); }, 1000, 1000);
|
|
}
|
|
private void checkClientStatus() {
|
|
String rawJson = simulateRestApiCall();
|
|
apiInputChannel.send(MessageBuilder
|
|
.withPayload(rawJson)
|
|
.build());
|
|
}
|
|
...
|
|
}
|
|
|
|
http-api.xml
|
|
<int:channel id="pollingSignalChannel"/>
|
|
|
|
<int:inbound-channel-adapter id="apiPoller"
|
|
channel="pollingSignalChannel"
|
|
expression="''"
|
|
auto-startup="false">
|
|
<int:poller fixed-rate="3000"/>
|
|
<int:inbound-channel-adapter>
|
|
|
|
<int-http:outbound-gateway id="apiGateway"
|
|
reply-channel="apiInputChannel"
|
|
reply-timeout="5000"
|
|
expected-response-type="java.lang.String"
|
|
url="http://localhost:9090/api"
|
|
http-method="GET"
|
|
request-channel="pollingSignalChannel" />
|
|
|
|
https://www.linkedin.com/learning/spring-spring-integration/advanced-spring-integration
|
|
|