Skip to content

Commit 4185400

Browse files
fmbenhassinemminella
authored andcommitted
Add JpaCursorItemReader implementation
Issue #901
1 parent d87588c commit 4185400

File tree

4 files changed

+654
-0
lines changed

4 files changed

+654
-0
lines changed
Lines changed: 153 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,153 @@
1+
/*
2+
* Copyright 2020 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package org.springframework.batch.item.database;
17+
18+
import java.util.Iterator;
19+
import java.util.Map;
20+
21+
import javax.persistence.EntityManager;
22+
import javax.persistence.EntityManagerFactory;
23+
import javax.persistence.Query;
24+
25+
import org.springframework.batch.item.ExecutionContext;
26+
import org.springframework.batch.item.ItemStreamException;
27+
import org.springframework.batch.item.database.orm.JpaQueryProvider;
28+
import org.springframework.batch.item.support.AbstractItemCountingItemStreamItemReader;
29+
import org.springframework.beans.factory.InitializingBean;
30+
import org.springframework.dao.DataAccessResourceFailureException;
31+
import org.springframework.util.Assert;
32+
import org.springframework.util.ClassUtils;
33+
34+
/**
35+
* {@link org.springframework.batch.item.ItemStreamReader} implementation based
36+
* on JPA {@link Query#getResultStream()}. It executes the JPQL query when
37+
* initialized and iterates over the result set as {@link #read()} method is called,
38+
* returning an object corresponding to the current row. The query can be set
39+
* directly using {@link #setQueryString(String)}, or using a query provider via
40+
* {@link #setQueryProvider(JpaQueryProvider)}.
41+
*
42+
* The implementation is <b>not</b> thread-safe.
43+
*
44+
* @author Mahmoud Ben Hassine
45+
* @param <T> type of items to read
46+
* @since 4.3
47+
*/
48+
public class JpaCursorItemReader<T> extends AbstractItemCountingItemStreamItemReader<T>
49+
implements InitializingBean {
50+
51+
private EntityManagerFactory entityManagerFactory;
52+
private EntityManager entityManager;
53+
private String queryString;
54+
private JpaQueryProvider queryProvider;
55+
private Map<String, Object> parameterValues;
56+
private Iterator<T> iterator;
57+
58+
/**
59+
* Create a new {@link JpaCursorItemReader}.
60+
*/
61+
public JpaCursorItemReader() {
62+
setName(ClassUtils.getShortName(JpaCursorItemReader.class));
63+
}
64+
65+
/**
66+
* Set the JPA entity manager factory.
67+
*
68+
* @param entityManagerFactory JPA entity manager factory
69+
*/
70+
public void setEntityManagerFactory(EntityManagerFactory entityManagerFactory) {
71+
this.entityManagerFactory = entityManagerFactory;
72+
}
73+
74+
/**
75+
* Set the JPA query provider.
76+
*
77+
* @param queryProvider JPA query provider
78+
*/
79+
public void setQueryProvider(JpaQueryProvider queryProvider) {
80+
this.queryProvider = queryProvider;
81+
}
82+
83+
/**
84+
* Set the JPQL query string.
85+
*
86+
* @param queryString JPQL query string
87+
*/
88+
public void setQueryString(String queryString) {
89+
this.queryString = queryString;
90+
}
91+
92+
/**
93+
* Set the parameter values to be used for the query execution.
94+
*
95+
* @param parameterValues the values keyed by parameter names used in
96+
* the query string.
97+
*/
98+
public void setParameterValues(Map<String, Object> parameterValues) {
99+
this.parameterValues = parameterValues;
100+
}
101+
102+
@Override
103+
public void afterPropertiesSet() throws Exception {
104+
Assert.notNull(this.entityManagerFactory, "EntityManagerFactory is required");
105+
if (this.queryProvider == null) {
106+
Assert.hasLength(this.queryString, "Query string is required when queryProvider is null");
107+
}
108+
}
109+
110+
@Override
111+
@SuppressWarnings("unchecked")
112+
protected void doOpen() throws Exception {
113+
this.entityManager = this.entityManagerFactory.createEntityManager();
114+
if (this.entityManager == null) {
115+
throw new DataAccessResourceFailureException("Unable to create an EntityManager");
116+
}
117+
if (this.queryProvider != null) {
118+
this.queryProvider.setEntityManager(this.entityManager);
119+
}
120+
Query query = createQuery();
121+
if (this.parameterValues != null) {
122+
this.parameterValues.forEach(query::setParameter);
123+
}
124+
this.iterator = query.getResultStream().iterator();
125+
}
126+
127+
private Query createQuery() {
128+
if (this.queryProvider == null) {
129+
return this.entityManager.createQuery(this.queryString);
130+
}
131+
else {
132+
return this.queryProvider.createQuery();
133+
}
134+
}
135+
136+
@Override
137+
protected T doRead() {
138+
return this.iterator.hasNext() ? this.iterator.next() : null;
139+
}
140+
141+
@Override
142+
public void update(ExecutionContext executionContext) throws ItemStreamException {
143+
super.update(executionContext);
144+
this.entityManager.clear();
145+
}
146+
147+
@Override
148+
protected void doClose() {
149+
if (this.entityManager != null) {
150+
this.entityManager.close();
151+
}
152+
}
153+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,182 @@
1+
/*
2+
* Copyright 2020 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package org.springframework.batch.item.database.builder;
17+
18+
import java.util.Map;
19+
20+
import javax.persistence.EntityManagerFactory;
21+
22+
import org.springframework.batch.item.ExecutionContext;
23+
import org.springframework.batch.item.ItemStreamSupport;
24+
import org.springframework.batch.item.database.JpaCursorItemReader;
25+
import org.springframework.batch.item.database.orm.JpaQueryProvider;
26+
import org.springframework.batch.item.support.AbstractItemCountingItemStreamItemReader;
27+
import org.springframework.util.Assert;
28+
29+
/**
30+
* Builder for {@link JpaCursorItemReader}.
31+
*
32+
* @author Mahmoud Ben Hassine
33+
*
34+
* @since 4.3
35+
*/
36+
public class JpaCursorItemReaderBuilder<T> {
37+
38+
private EntityManagerFactory entityManagerFactory;
39+
private String queryString;
40+
private JpaQueryProvider queryProvider;
41+
private Map<String, Object> parameterValues;
42+
private boolean saveState = true;
43+
private String name;
44+
private int maxItemCount = Integer.MAX_VALUE;
45+
private int currentItemCount;
46+
47+
/**
48+
* Configure if the state of the {@link ItemStreamSupport}
49+
* should be persisted within the {@link ExecutionContext}
50+
* for restart purposes.
51+
*
52+
* @param saveState defaults to true
53+
* @return The current instance of the builder.
54+
*/
55+
public JpaCursorItemReaderBuilder<T> saveState(boolean saveState) {
56+
this.saveState = saveState;
57+
58+
return this;
59+
}
60+
61+
/**
62+
* The name used to calculate the key within the {@link ExecutionContext}.
63+
* Required if {@link #saveState(boolean)} is set to true.
64+
*
65+
* @param name name of the reader instance
66+
* @return The current instance of the builder.
67+
* @see ItemStreamSupport#setName(String)
68+
*/
69+
public JpaCursorItemReaderBuilder<T> name(String name) {
70+
this.name = name;
71+
72+
return this;
73+
}
74+
75+
/**
76+
* Configure the max number of items to be read.
77+
*
78+
* @param maxItemCount the max items to be read
79+
* @return The current instance of the builder.
80+
* @see AbstractItemCountingItemStreamItemReader#setMaxItemCount(int)
81+
*/
82+
public JpaCursorItemReaderBuilder<T> maxItemCount(int maxItemCount) {
83+
this.maxItemCount = maxItemCount;
84+
85+
return this;
86+
}
87+
88+
/**
89+
* Index for the current item. Used on restarts to indicate where to start from.
90+
*
91+
* @param currentItemCount current index
92+
* @return this instance for method chaining
93+
* @see AbstractItemCountingItemStreamItemReader#setCurrentItemCount(int)
94+
*/
95+
public JpaCursorItemReaderBuilder<T> currentItemCount(int currentItemCount) {
96+
this.currentItemCount = currentItemCount;
97+
98+
return this;
99+
}
100+
101+
/**
102+
* A map of parameter values to be set on the query. The key of the map is
103+
* the name of the parameter to be set with the value being the value to be set.
104+
*
105+
* @param parameterValues map of values
106+
* @return this instance for method chaining
107+
* @see JpaCursorItemReader#setParameterValues(Map)
108+
*/
109+
public JpaCursorItemReaderBuilder<T> parameterValues(Map<String, Object> parameterValues) {
110+
this.parameterValues = parameterValues;
111+
112+
return this;
113+
}
114+
115+
/**
116+
* A query provider. This should be set only if {@link #queryString(String)}
117+
* have not been set.
118+
*
119+
* @param queryProvider the query provider
120+
* @return this instance for method chaining
121+
* @see JpaCursorItemReader#setQueryProvider(JpaQueryProvider)
122+
*/
123+
public JpaCursorItemReaderBuilder<T> queryProvider(JpaQueryProvider queryProvider) {
124+
this.queryProvider = queryProvider;
125+
126+
return this;
127+
}
128+
129+
/**
130+
* The JPQL query string to execute. This should only be set if
131+
* {@link #queryProvider(JpaQueryProvider)} has not been set.
132+
*
133+
* @param queryString the JPQL query
134+
* @return this instance for method chaining
135+
* @see JpaCursorItemReader#setQueryString(String)
136+
*/
137+
public JpaCursorItemReaderBuilder<T> queryString(String queryString) {
138+
this.queryString = queryString;
139+
140+
return this;
141+
}
142+
143+
/**
144+
* The {@link EntityManagerFactory} to be used for executing the configured
145+
* {@link #queryString}.
146+
*
147+
* @param entityManagerFactory {@link EntityManagerFactory} used to create
148+
* {@link javax.persistence.EntityManager}
149+
* @return this instance for method chaining
150+
*/
151+
public JpaCursorItemReaderBuilder<T> entityManagerFactory(EntityManagerFactory entityManagerFactory) {
152+
this.entityManagerFactory = entityManagerFactory;
153+
154+
return this;
155+
}
156+
157+
/**
158+
* Returns a fully constructed {@link JpaCursorItemReader}.
159+
*
160+
* @return a new {@link JpaCursorItemReader}
161+
*/
162+
public JpaCursorItemReader<T> build() {
163+
Assert.notNull(this.entityManagerFactory, "An EntityManagerFactory is required");
164+
if (this.saveState) {
165+
Assert.hasText(this.name, "A name is required when saveState is set to true");
166+
}
167+
if (this.queryProvider == null) {
168+
Assert.hasLength(this.queryString, "Query string is required when queryProvider is null");
169+
}
170+
171+
JpaCursorItemReader<T> reader = new JpaCursorItemReader<>();
172+
reader.setEntityManagerFactory(this.entityManagerFactory);
173+
reader.setQueryProvider(this.queryProvider);
174+
reader.setQueryString(this.queryString);
175+
reader.setParameterValues(this.parameterValues);
176+
reader.setCurrentItemCount(this.currentItemCount);
177+
reader.setMaxItemCount(this.maxItemCount);
178+
reader.setSaveState(this.saveState);
179+
reader.setName(this.name);
180+
return reader;
181+
}
182+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
/*
2+
* Copyright 2020 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package org.springframework.batch.item.database;
17+
18+
import org.springframework.batch.item.ExecutionContext;
19+
import org.springframework.batch.item.ItemReader;
20+
import org.springframework.batch.item.sample.Foo;
21+
import org.springframework.orm.jpa.LocalContainerEntityManagerFactoryBean;
22+
import org.springframework.orm.jpa.vendor.HibernateJpaVendorAdapter;
23+
24+
/**
25+
* @author Mahmoud Ben Hassine
26+
*/
27+
public class JpaCursorItemReaderCommonTests extends
28+
AbstractDatabaseItemStreamItemReaderTests {
29+
30+
@Override
31+
protected ItemReader<Foo> getItemReader() throws Exception {
32+
LocalContainerEntityManagerFactoryBean factoryBean =
33+
new LocalContainerEntityManagerFactoryBean();
34+
factoryBean.setDataSource(getDataSource());
35+
factoryBean.setPersistenceUnitName("bar");
36+
factoryBean.setJpaVendorAdapter(new HibernateJpaVendorAdapter());
37+
factoryBean.afterPropertiesSet();
38+
39+
String jpqlQuery = "from Foo";
40+
JpaCursorItemReader<Foo> itemReader = new JpaCursorItemReader<>();
41+
itemReader.setQueryString(jpqlQuery);
42+
itemReader.setEntityManagerFactory(factoryBean.getObject());
43+
itemReader.afterPropertiesSet();
44+
itemReader.setSaveState(true);
45+
return itemReader;
46+
}
47+
48+
@Override
49+
protected void pointToEmptyInput(ItemReader<Foo> tested) throws Exception {
50+
JpaCursorItemReader<Foo> reader = (JpaCursorItemReader<Foo>) tested;
51+
reader.close();
52+
reader.setQueryString("from Foo foo where foo.id = -1");
53+
reader.afterPropertiesSet();
54+
reader.open(new ExecutionContext());
55+
}
56+
}

0 commit comments

Comments
 (0)