D:\dev\Repo\springsource\spring-batch\trunk\spring-batch-core\src\main\java\org\springframework\batch\core\partition\support\ TaskExecutorPartitionHandler.java SimpleStepExecutionSplitter.java SimplePartitioner.java JobInstance JobParameters JobExecution StepExecution - A StepExecution represents a single attempt to execute a Step. ExecutionContext - Map of parameters/states for the step, which contains any data a developer needs persisted across batch runs, such as statistics or state information needed to restart 2.0 Changes: - Chunk-oriented processing (read N items, then write them as a chunk) ItemReader, , ItemWriter ??? ItemStream /** * Strategy interface for providing the data.
* * Implementations are expected to be stateful and will be called multiple times * for each batch, with each call to {@link #read()} returning a different value * and finally returning null when all input data is * exhausted.
* * Implementations need *not* be thread safe and clients of a {@link ItemReader} * need to be aware that this is the case.
* * A richer interface (e.g. with a look ahead or peek) is not feasible because * we need to support transactions in an asynchronous batch. */ public interface ItemReader { /** * Reads a piece of input data and advance to the next one. Implementations * must return null at the end of the input * data set. In a transactional setting, caller might get the same item * twice from successive calls (or otherwise), if the first call was in a * transaction that rolled back. * * @throws Exception if an underlying resource is unavailable. */ T read() throws Exception, UnexpectedInputException, ParseException; } /** *

* Marker interface defining a contract for periodically storing state and restoring from that state should an error * occur. *

*/ public interface ItemStream { /** * Open the stream for the provided {@link ExecutionContext}. * * @throws IllegalArgumentException if context is null */ void open(ExecutionContext executionContext) throws ItemStreamException; /** * Indicates that the execution context provided during open is about to be saved. If any state is remaining, but * has not been put in the context, it should be added here. * * @param executionContext to be updated * @throws IllegalArgumentException if executionContext is null. */ void update(ExecutionContext executionContext) throws ItemStreamException; /** * If any resources are needed for the stream to operate they need to be destroyed here. Once this method has been * called all other methods (except open) may throw an exception. */ void close() throws ItemStreamException; } ----- StepExecutionSplitter PartitionHandler ----- -------------------------------------------------------------------------------------- PartitionStep (org.springframework.batch.core.partition.support) extends AbstractStep ??? don't have item reader/writer { ... @Override protected void doExecute(StepExecution stepExecution) throws Exception { // Wait for task completion and then aggregate the results Collection executions = partitionHandler.handle(stepExecutionSplitter, stepExecution); stepExecution.upgradeStatus(BatchStatus.COMPLETED); stepExecutionAggregator.aggregate(stepExecution, executions); // If anything failed or had a problem we need to crap out if (stepExecution.getStatus().isUnsuccessful()) { throw new JobExecutionException("Partition handler returned an unsuccessful step"); } } ... } /** * A {@link Step} implementation that provides common behavior to subclasses, * including registering and calling listeners. */ public abstract class AbstractStep implements Step, InitializingBean, BeanNameAware { private static final Log logger = LogFactory.getLog(AbstractStep.class); private String name; private int startLimit = Integer.MAX_VALUE; private boolean allowStartIfComplete = false; private CompositeStepExecutionListener stepExecutionListener = new CompositeStepExecutionListener(); private JobRepository jobRepository; public void afterPropertiesSet() throws Exception { Assert.notNull(jobRepository, "JobRepository is mandatory"); } /** * Extension point for subclasses to execute business logic. Subclasses * should set the {@link ExitStatus} on the {@link StepExecution} before * returning. * * @param stepExecution the current step context * @throws Exception */ protected abstract void doExecute(StepExecution stepExecution) throws Exception; /** * Extension point for subclasses to provide callbacks to their * collaborators at the beginning of a step, to open or acquire resources. * Does nothing by default. * * @param ctx the {@link ExecutionContext} to use * @throws Exception */ protected void open(ExecutionContext ctx) throws Exception { } /** * Extension point for subclasses to provide callbacks to their * collaborators at the end of a step (right at the end of the finally * block), to close or release resources. Does nothing by default. * * @param ctx the {@link ExecutionContext} to use * @throws Exception */ protected void close(ExecutionContext ctx) throws Exception { } /** * Template method for step execution logic - calls abstract methods for * resource initialization ({@link #open(ExecutionContext)}), execution * logic ({@link #doExecute(StepExecution)}) and resource closing ( * {@link #close(ExecutionContext)}). */ public final void execute(StepExecution stepExecution) throws JobInterruptedException, UnexpectedJobExecutionException { logger.debug("Executing: id="+stepExecution.getId()); stepExecution.setStartTime(new Date()); stepExecution.setStatus(BatchStatus.STARTED); getJobRepository().update(stepExecution); // Start with a default value that will be trumped by anything ExitStatus exitStatus = ExitStatus.EXECUTING; Exception commitException = null; StepSynchronizationManager.register(stepExecution); try { getCompositeListener().beforeStep(stepExecution); open(stepExecution.getExecutionContext()); try { doExecute(stepExecution); } catch (RepeatException e) { throw e.getCause(); } exitStatus = ExitStatus.COMPLETED.and(stepExecution.getExitStatus()); // Check if someone is trying to stop us if (stepExecution.isTerminateOnly()) { throw new JobInterruptedException("JobExecution interrupted."); } // Need to upgrade here not set, in case the execution was stopped stepExecution.upgradeStatus(BatchStatus.COMPLETED); logger.debug("Step execution success: id=" + stepExecution.getId()); } catch (Throwable e) { logger.error("Encountered an error executing the step", e); stepExecution.upgradeStatus(determineBatchStatus(e)); exitStatus = exitStatus.and(getDefaultExitStatusForFailure(e)); stepExecution.addFailureException(e); } finally { try { // Update the step execution to the latest known value so the listeners can act on it exitStatus = exitStatus.and(stepExecution.getExitStatus()); stepExecution.setExitStatus(exitStatus); exitStatus = exitStatus.and(getCompositeListener().afterStep(stepExecution)); } catch (Exception e) { logger.error("Exception in afterStep callback", e); } try { getJobRepository().updateExecutionContext(stepExecution); } catch (Exception e) { stepExecution.setStatus(BatchStatus.UNKNOWN); exitStatus = exitStatus.and(ExitStatus.UNKNOWN); stepExecution.addFailureException(e); logger.error("Encountered an error saving batch meta data." + "This job is now in an unknown state and should not be restarted.", commitException); } stepExecution.setEndTime(new Date()); stepExecution.setExitStatus(exitStatus); try { getJobRepository().update(stepExecution); } catch (Exception e) { stepExecution.setStatus(BatchStatus.UNKNOWN); stepExecution.setExitStatus(exitStatus.and(ExitStatus.UNKNOWN)); stepExecution.addFailureException(e); logger.error("Encountered an error saving batch meta data." + "This job is now in an unknown state and should not be restarted.", commitException); } try { close(stepExecution.getExecutionContext()); } catch (Exception e) { logger.error("Exception while closing step execution resources", e); stepExecution.addFailureException(e); } StepSynchronizationManager.release(); logger.debug("Step execution complete: " + stepExecution.getSummary()); } } /** * Determine the step status based on the exception. */ private static BatchStatus determineBatchStatus(Throwable e) { if (e instanceof JobInterruptedException || e.getCause() instanceof JobInterruptedException) { return BatchStatus.STOPPED; } else { return BatchStatus.FAILED; } } /** * Default mapping from throwable to {@link ExitStatus}. Clients can modify * the exit code using a {@link StepExecutionListener}. * * @param ex the cause of the failure * @return an {@link ExitStatus} */ private ExitStatus getDefaultExitStatusForFailure(Throwable ex) { ExitStatus exitStatus; if (ex instanceof JobInterruptedException || ex.getCause() instanceof JobInterruptedException) { exitStatus = ExitStatus.STOPPED.addExitDescription(JobInterruptedException.class.getName()); } else if (ex instanceof NoSuchJobException || ex.getCause() instanceof NoSuchJobException) { exitStatus = new ExitStatus(ExitCodeMapper.NO_SUCH_JOB, ex.getClass().getName()); } else { exitStatus = ExitStatus.FAILED.addExitDescription(ex); } return exitStatus; } } /** * Batch domain interface representing the configuration of a step. As with the {@link Job}, a {@link Step} is meant to * explicitly represent a the configuration of a step by a developer, but also the ability to execute the step. */ public interface Step { // @return the name of this step. String getName(); // @return true if a step that is already marked as complete can be started again. boolean isAllowStartIfComplete(); // @return the number of times a job can be started with the same identifier. int getStartLimit(); /** * Process the step and assign progress and status meta information to the {@link StepExecution} provided. The * {@link Step} is responsible for setting the meta information and also saving it if required by the * implementation.
* * It is not safe to re-use an instance of {@link Step} to process multiple concurrent executions. * * @param stepExecution an entity representing the step to be executed * * @throws JobInterruptedException if the step is interrupted externally */ void execute(StepExecution stepExecution) throws JobInterruptedException; } ColumnRangePartitioner - partition based on column ranges ProActive: ProActiveMaster ProActiveSchedulerPartitionHandler implements PartitionHandler