зеркало из
				https://github.com/iharh/notes.git
				synced 2025-11-04 07:36:08 +02:00 
			
		
		
		
	
		
			
				
	
	
		
			365 строки
		
	
	
		
			11 KiB
		
	
	
	
		
			Plaintext
		
	
	
	
	
	
			
		
		
	
	
			365 строки
		
	
	
		
			11 KiB
		
	
	
	
		
			Plaintext
		
	
	
	
	
	
D:\dev\Repo\springsource\spring-batch\trunk\spring-batch-core\src\main\java\org\springframework\batch\core\partition\support\
 | 
						|
TaskExecutorPartitionHandler.java
 | 
						|
SimpleStepExecutionSplitter.java
 | 
						|
SimplePartitioner.java
 | 
						|
 | 
						|
 | 
						|
 | 
						|
 | 
						|
JobInstance
 | 
						|
 | 
						|
JobParameters
 | 
						|
 | 
						|
JobExecution
 | 
						|
 | 
						|
 | 
						|
StepExecution    - A StepExecution represents a single attempt to execute a Step.
 | 
						|
 | 
						|
ExecutionContext - Map of parameters/states for the step, which contains any data a developer needs persisted
 | 
						|
                   across batch runs, such as statistics or state information needed to restart
 | 
						|
 | 
						|
2.0 Changes:
 | 
						|
- Chunk-oriented processing (read N items, then write them as a chunk)
 | 
						|
ItemReader, <ItemProcessor>, ItemWriter
 | 
						|
??? ItemStream
 | 
						|
 | 
						|
/**
 | 
						|
 * Strategy interface for providing the data. <br/>
 | 
						|
 * 
 | 
						|
 * Implementations are expected to be stateful and will be called multiple times
 | 
						|
 * for each batch, with each call to {@link #read()} returning a different value
 | 
						|
 * and finally returning <code>null</code> when all input data is
 | 
						|
 * exhausted.<br/>
 | 
						|
 * 
 | 
						|
 * Implementations need *not* be thread safe and clients of a {@link ItemReader}
 | 
						|
 * need to be aware that this is the case.<br/>
 | 
						|
 * 
 | 
						|
 * A richer interface (e.g. with a look ahead or peek) is not feasible because
 | 
						|
 * we need to support transactions in an asynchronous batch.
 | 
						|
 */
 | 
						|
public interface ItemReader<T> {
 | 
						|
 | 
						|
	/**
 | 
						|
	 * Reads a piece of input data and advance to the next one. Implementations
 | 
						|
	 * <strong>must</strong> return <code>null</code> at the end of the input
 | 
						|
	 * data set. In a transactional setting, caller might get the same item
 | 
						|
	 * twice from successive calls (or otherwise), if the first call was in a
 | 
						|
	 * transaction that rolled back.
 | 
						|
	 * 
 | 
						|
	 * @throws Exception if an underlying resource is unavailable.
 | 
						|
	 */
 | 
						|
	T read() throws Exception, UnexpectedInputException, ParseException;
 | 
						|
 | 
						|
}
 | 
						|
 | 
						|
 | 
						|
 | 
						|
/**
 | 
						|
 * <p>
 | 
						|
 * Marker interface defining a contract for periodically storing state and restoring from that state should an error
 | 
						|
 * occur.
 | 
						|
 * <p>
 | 
						|
 */
 | 
						|
public interface ItemStream {
 | 
						|
 | 
						|
	/**
 | 
						|
	 * Open the stream for the provided {@link ExecutionContext}.
 | 
						|
	 * 
 | 
						|
	 * @throws IllegalArgumentException if context is null
 | 
						|
	 */
 | 
						|
	void open(ExecutionContext executionContext) throws ItemStreamException;
 | 
						|
 | 
						|
	/**
 | 
						|
	 * Indicates that the execution context provided during open is about to be saved. If any state is remaining, but
 | 
						|
	 * has not been put in the context, it should be added here.
 | 
						|
	 * 
 | 
						|
	 * @param executionContext to be updated
 | 
						|
	 * @throws IllegalArgumentException if executionContext is null.
 | 
						|
	 */
 | 
						|
	void update(ExecutionContext executionContext) throws ItemStreamException;
 | 
						|
 | 
						|
	/**
 | 
						|
	 * If any resources are needed for the stream to operate they need to be destroyed here. Once this method has been
 | 
						|
	 * called all other methods (except open) may throw an exception.
 | 
						|
	 */
 | 
						|
	void close() throws ItemStreamException;
 | 
						|
}
 | 
						|
 | 
						|
 | 
						|
 | 
						|
-----
 | 
						|
StepExecutionSplitter
 | 
						|
PartitionHandler
 | 
						|
-----
 | 
						|
 | 
						|
--------------------------------------------------------------------------------------
 | 
						|
 | 
						|
PartitionStep (org.springframework.batch.core.partition.support) extends AbstractStep
 | 
						|
??? don't have item reader/writer
 | 
						|
{
 | 
						|
...
 | 
						|
	@Override
 | 
						|
	protected void doExecute(StepExecution stepExecution) throws Exception {
 | 
						|
 | 
						|
		// Wait for task completion and then aggregate the results
 | 
						|
		Collection<StepExecution> executions = partitionHandler.handle(stepExecutionSplitter, stepExecution);
 | 
						|
		stepExecution.upgradeStatus(BatchStatus.COMPLETED);
 | 
						|
		stepExecutionAggregator.aggregate(stepExecution, executions);
 | 
						|
 | 
						|
		// If anything failed or had a problem we need to crap out
 | 
						|
		if (stepExecution.getStatus().isUnsuccessful()) {
 | 
						|
			throw new JobExecutionException("Partition handler returned an unsuccessful step");
 | 
						|
		}
 | 
						|
 | 
						|
	}
 | 
						|
...
 | 
						|
}
 | 
						|
 | 
						|
 | 
						|
/**
 | 
						|
 * A {@link Step} implementation that provides common behavior to subclasses,
 | 
						|
 * including registering and calling listeners.
 | 
						|
 */
 | 
						|
public abstract class AbstractStep implements Step, InitializingBean, BeanNameAware {
 | 
						|
	private static final Log logger = LogFactory.getLog(AbstractStep.class);
 | 
						|
 | 
						|
	private String name;
 | 
						|
 | 
						|
	private int startLimit = Integer.MAX_VALUE;
 | 
						|
 | 
						|
	private boolean allowStartIfComplete = false;
 | 
						|
 | 
						|
	private CompositeStepExecutionListener stepExecutionListener = new CompositeStepExecutionListener();
 | 
						|
 | 
						|
	private JobRepository jobRepository;
 | 
						|
 | 
						|
	public void afterPropertiesSet() throws Exception {
 | 
						|
		Assert.notNull(jobRepository, "JobRepository is mandatory");
 | 
						|
	}
 | 
						|
 | 
						|
	/**
 | 
						|
	 * Extension point for subclasses to execute business logic. Subclasses
 | 
						|
	 * should set the {@link ExitStatus} on the {@link StepExecution} before
 | 
						|
	 * returning.
 | 
						|
	 * 
 | 
						|
	 * @param stepExecution the current step context
 | 
						|
	 * @throws Exception
 | 
						|
	 */
 | 
						|
	protected abstract void doExecute(StepExecution stepExecution) throws Exception;
 | 
						|
 | 
						|
	/**
 | 
						|
	 * Extension point for subclasses to provide callbacks to their
 | 
						|
	 * collaborators at the beginning of a step, to open or acquire resources.
 | 
						|
	 * Does nothing by default.
 | 
						|
	 * 
 | 
						|
	 * @param ctx the {@link ExecutionContext} to use
 | 
						|
	 * @throws Exception
 | 
						|
	 */
 | 
						|
	protected void open(ExecutionContext ctx) throws Exception {
 | 
						|
	}
 | 
						|
 | 
						|
	/**
 | 
						|
	 * Extension point for subclasses to provide callbacks to their
 | 
						|
	 * collaborators at the end of a step (right at the end of the finally
 | 
						|
	 * block), to close or release resources. Does nothing by default.
 | 
						|
	 * 
 | 
						|
	 * @param ctx the {@link ExecutionContext} to use
 | 
						|
	 * @throws Exception
 | 
						|
	 */
 | 
						|
	protected void close(ExecutionContext ctx) throws Exception {
 | 
						|
	}
 | 
						|
 | 
						|
	/**
 | 
						|
	 * Template method for step execution logic - calls abstract methods for
 | 
						|
	 * resource initialization ({@link #open(ExecutionContext)}), execution
 | 
						|
	 * logic ({@link #doExecute(StepExecution)}) and resource closing (
 | 
						|
	 * {@link #close(ExecutionContext)}).
 | 
						|
	 */
 | 
						|
	public final void execute(StepExecution stepExecution) throws JobInterruptedException,
 | 
						|
			UnexpectedJobExecutionException {
 | 
						|
		
 | 
						|
		logger.debug("Executing: id="+stepExecution.getId());
 | 
						|
		stepExecution.setStartTime(new Date());
 | 
						|
		stepExecution.setStatus(BatchStatus.STARTED);
 | 
						|
		getJobRepository().update(stepExecution);
 | 
						|
 | 
						|
		// Start with a default value that will be trumped by anything
 | 
						|
		ExitStatus exitStatus = ExitStatus.EXECUTING;
 | 
						|
		Exception commitException = null;
 | 
						|
 | 
						|
		StepSynchronizationManager.register(stepExecution);
 | 
						|
 | 
						|
		try {
 | 
						|
			getCompositeListener().beforeStep(stepExecution);
 | 
						|
			open(stepExecution.getExecutionContext());
 | 
						|
 | 
						|
			try {
 | 
						|
				doExecute(stepExecution);
 | 
						|
			}
 | 
						|
			catch (RepeatException e) {
 | 
						|
				throw e.getCause();
 | 
						|
			}
 | 
						|
			exitStatus = ExitStatus.COMPLETED.and(stepExecution.getExitStatus());
 | 
						|
 | 
						|
			// Check if someone is trying to stop us
 | 
						|
			if (stepExecution.isTerminateOnly()) {
 | 
						|
				throw new JobInterruptedException("JobExecution interrupted.");
 | 
						|
			}
 | 
						|
 | 
						|
			// Need to upgrade here not set, in case the execution was stopped
 | 
						|
			stepExecution.upgradeStatus(BatchStatus.COMPLETED);
 | 
						|
			logger.debug("Step execution success: id=" + stepExecution.getId());
 | 
						|
		}
 | 
						|
		catch (Throwable e) {
 | 
						|
			logger.error("Encountered an error executing the step", e);
 | 
						|
			stepExecution.upgradeStatus(determineBatchStatus(e));
 | 
						|
			exitStatus = exitStatus.and(getDefaultExitStatusForFailure(e));
 | 
						|
			stepExecution.addFailureException(e);
 | 
						|
		}
 | 
						|
		finally {
 | 
						|
 | 
						|
			try {
 | 
						|
				// Update the step execution to the latest known value so the listeners can act on it
 | 
						|
				exitStatus = exitStatus.and(stepExecution.getExitStatus());
 | 
						|
				stepExecution.setExitStatus(exitStatus);
 | 
						|
				exitStatus = exitStatus.and(getCompositeListener().afterStep(stepExecution));
 | 
						|
			}
 | 
						|
			catch (Exception e) {
 | 
						|
				logger.error("Exception in afterStep callback", e);
 | 
						|
			}
 | 
						|
 | 
						|
			try {
 | 
						|
				getJobRepository().updateExecutionContext(stepExecution);
 | 
						|
			}
 | 
						|
			catch (Exception e) {
 | 
						|
				stepExecution.setStatus(BatchStatus.UNKNOWN);
 | 
						|
				exitStatus = exitStatus.and(ExitStatus.UNKNOWN);
 | 
						|
				stepExecution.addFailureException(e);
 | 
						|
				logger.error("Encountered an error saving batch meta data."
 | 
						|
						+ "This job is now in an unknown state and should not be restarted.", commitException);
 | 
						|
			}
 | 
						|
 | 
						|
			stepExecution.setEndTime(new Date());
 | 
						|
			stepExecution.setExitStatus(exitStatus);
 | 
						|
 | 
						|
			try {
 | 
						|
				getJobRepository().update(stepExecution);
 | 
						|
			}
 | 
						|
			catch (Exception e) {
 | 
						|
				stepExecution.setStatus(BatchStatus.UNKNOWN);
 | 
						|
				stepExecution.setExitStatus(exitStatus.and(ExitStatus.UNKNOWN));
 | 
						|
				stepExecution.addFailureException(e);
 | 
						|
				logger.error("Encountered an error saving batch meta data."
 | 
						|
						+ "This job is now in an unknown state and should not be restarted.", commitException);
 | 
						|
			}
 | 
						|
 | 
						|
			try {
 | 
						|
				close(stepExecution.getExecutionContext());
 | 
						|
			}
 | 
						|
			catch (Exception e) {
 | 
						|
				logger.error("Exception while closing step execution resources", e);
 | 
						|
				stepExecution.addFailureException(e);
 | 
						|
			}
 | 
						|
 | 
						|
			StepSynchronizationManager.release();
 | 
						|
 | 
						|
			logger.debug("Step execution complete: " + stepExecution.getSummary());
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	/**
 | 
						|
	 * Determine the step status based on the exception.
 | 
						|
	 */
 | 
						|
	private static BatchStatus determineBatchStatus(Throwable e) {
 | 
						|
		if (e instanceof JobInterruptedException || e.getCause() instanceof JobInterruptedException) {
 | 
						|
			return BatchStatus.STOPPED;
 | 
						|
		}
 | 
						|
		else {
 | 
						|
			return BatchStatus.FAILED;
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	/**
 | 
						|
	 * Default mapping from throwable to {@link ExitStatus}. Clients can modify
 | 
						|
	 * the exit code using a {@link StepExecutionListener}.
 | 
						|
	 * 
 | 
						|
	 * @param ex the cause of the failure
 | 
						|
	 * @return an {@link ExitStatus}
 | 
						|
	 */
 | 
						|
	private ExitStatus getDefaultExitStatusForFailure(Throwable ex) {
 | 
						|
		ExitStatus exitStatus;
 | 
						|
		if (ex instanceof JobInterruptedException || ex.getCause() instanceof JobInterruptedException) {
 | 
						|
			exitStatus = ExitStatus.STOPPED.addExitDescription(JobInterruptedException.class.getName());
 | 
						|
		}
 | 
						|
		else if (ex instanceof NoSuchJobException || ex.getCause() instanceof NoSuchJobException) {
 | 
						|
			exitStatus = new ExitStatus(ExitCodeMapper.NO_SUCH_JOB, ex.getClass().getName());
 | 
						|
		}
 | 
						|
		else {
 | 
						|
			exitStatus = ExitStatus.FAILED.addExitDescription(ex);
 | 
						|
		}
 | 
						|
 | 
						|
		return exitStatus;
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
 | 
						|
/**
 | 
						|
 * Batch domain interface representing the configuration of a step. As with the {@link Job}, a {@link Step} is meant to
 | 
						|
 * explicitly represent a the configuration of a step by a developer, but also the ability to execute the step.
 | 
						|
 */
 | 
						|
public interface Step {
 | 
						|
	// @return the name of this step.
 | 
						|
	String getName();
 | 
						|
 | 
						|
	// @return true if a step that is already marked as complete can be started again.
 | 
						|
	boolean isAllowStartIfComplete();
 | 
						|
 | 
						|
	// @return the number of times a job can be started with the same identifier.
 | 
						|
	int getStartLimit();
 | 
						|
 | 
						|
	/**
 | 
						|
	 * Process the step and assign progress and status meta information to the {@link StepExecution} provided. The
 | 
						|
	 * {@link Step} is responsible for setting the meta information and also saving it if required by the
 | 
						|
	 * implementation.<br/>
 | 
						|
	 * 
 | 
						|
	 * It is not safe to re-use an instance of {@link Step} to process multiple concurrent executions.
 | 
						|
	 * 
 | 
						|
	 * @param stepExecution an entity representing the step to be executed
 | 
						|
	 * 
 | 
						|
	 * @throws JobInterruptedException if the step is interrupted externally
 | 
						|
	 */
 | 
						|
	void execute(StepExecution stepExecution) throws JobInterruptedException;
 | 
						|
}
 | 
						|
 | 
						|
 | 
						|
 | 
						|
ColumnRangePartitioner - partition based on column ranges
 | 
						|
 | 
						|
 | 
						|
 | 
						|
 | 
						|
 | 
						|
 | 
						|
 | 
						|
 | 
						|
 | 
						|
 | 
						|
 | 
						|
 | 
						|
 | 
						|
 | 
						|
 | 
						|
 | 
						|
 | 
						|
 | 
						|
 | 
						|
 | 
						|
 | 
						|
ProActive:
 | 
						|
 | 
						|
ProActiveMaster
 | 
						|
 | 
						|
ProActiveSchedulerPartitionHandler implements PartitionHandler
 | 
						|
 | 
						|
 |