6060import org .apache .beam .sdk .extensions .avro .coders .AvroCoder ;
6161import org .apache .beam .sdk .io .parquet .ParquetIO ;
6262import org .apache .beam .sdk .testing .TestPipeline ;
63+ import org .apache .beam .sdk .testing .TestPipelineExtension ;
6364import org .apache .beam .sdk .transforms .Create ;
6465import org .apache .beam .sdk .transforms .DoFn ;
6566import org .apache .beam .sdk .transforms .GroupIntoBatches ;
7071import org .apache .beam .sdk .transforms .WithKeys ;
7172import org .apache .beam .sdk .values .KV ;
7273import org .apache .beam .sdk .values .PCollection ;
73- import org .checkerframework .checker .nullness .qual .NonNull ;
74- import org .junit .ClassRule ;
75- import org .junit .Rule ;
76- import org .junit .Test ;
74+ import org .junit .jupiter .api .Test ;
75+ import org .junit .jupiter .api .extension .ExtendWith ;
7776import org .neo4j .cypherdsl .core .Cypher ;
7877import org .neo4j .cypherdsl .core .Expression ;
7978import org .neo4j .cypherdsl .core .MapExpression ;
102101import org .neo4j .importer .v1 .sources .Source ;
103102import org .neo4j .importer .v1 .sources .SourceProvider ;
104103import org .neo4j .importer .v1 .targets .PropertyType ;
105- import org .testcontainers .containers .Neo4jContainer ;
104+ import org .testcontainers .junit .jupiter .Container ;
105+ import org .testcontainers .junit .jupiter .Testcontainers ;
106+ import org .testcontainers .neo4j .Neo4jContainer ;
106107import org .testcontainers .utility .DockerImageName ;
107108
109+ @ ExtendWith (TestPipelineExtension .class )
110+ @ Testcontainers
108111public class BeamExampleIT {
109112
110- @ Rule
111- public final TestPipeline pipeline = TestPipeline .create ();
112-
113- @ ClassRule
114- public static Neo4jContainer <?> NEO4J = new Neo4jContainer <>(DockerImageName .parse ("neo4j:5-enterprise" ))
113+ @ Container
114+ public static Neo4jContainer NEO4J = new Neo4jContainer (DockerImageName .parse ("neo4j:5-enterprise" ))
115115 .withEnv ("NEO4J_ACCEPT_LICENSE_AGREEMENT" , "yes" )
116116 .withAdminPassword ("letmein!" );
117117
118118 @ Test
119- public void imports_dvd_rental_data_set () throws Exception {
119+ public void imports_dvd_rental_data_set (TestPipeline pipeline ) throws Exception {
120120 try (InputStream stream = this .getClass ().getResourceAsStream ("/specs/dvd_rental.yaml" )) {
121121 assertThat (stream ).isNotNull ();
122122
@@ -125,9 +125,9 @@ public void imports_dvd_rental_data_set() throws Exception {
125125 var importPipeline = ImportPipeline .of (ImportSpecificationDeserializer .deserialize (reader ));
126126 importPipeline .forEach (step -> {
127127 switch (step ) {
128- case SourceStep source -> handleSource (source , outputs );
129- case ActionStep action -> handleAction (action , outputs );
130- case TargetStep target -> handleTarget (target , outputs );
128+ case SourceStep source -> handleSource (source , pipeline , outputs );
129+ case ActionStep action -> handleAction (action , pipeline , outputs );
130+ case TargetStep target -> handleTarget (target , pipeline , outputs );
131131 default -> throw new IllegalStateException ("Unexpected value: " + step );
132132 }
133133 });
@@ -152,7 +152,7 @@ public void imports_dvd_rental_data_set() throws Exception {
152152 }
153153 }
154154
155- private void handleSource (SourceStep step , Map <String , PCollection <?>> outputs ) {
155+ private void handleSource (SourceStep step , TestPipeline pipeline , Map <String , PCollection <?>> outputs ) {
156156 var name = step .name ();
157157 var source = step .source ();
158158 assertThat (source ).isInstanceOf (ParquetSource .class );
@@ -165,7 +165,7 @@ private void handleSource(SourceStep step, Map<String, PCollection<?>> outputs)
165165 outputs .put (source .getName (), output );
166166 }
167167
168- private void handleAction (ActionStep step , Map <String , PCollection <?>> outputs ) {
168+ private void handleAction (ActionStep step , TestPipeline pipeline , Map <String , PCollection <?>> outputs ) {
169169 var actionName = step .name ();
170170 var action = step .action ();
171171 assertThat (action ).isInstanceOf (CypherAction .class );
@@ -183,7 +183,7 @@ private void handleAction(ActionStep step, Map<String, PCollection<?>> outputs)
183183 }
184184
185185 @ SuppressWarnings ("unchecked" )
186- private void handleTarget (TargetStep step , Map <String , PCollection <?>> outputs ) {
186+ private void handleTarget (TargetStep step , TestPipeline pipeline , Map <String , PCollection <?>> outputs ) {
187187 var stepName = step .name ();
188188 assertThat (step ).isInstanceOf (EntityTargetStep .class );
189189 var entityTargetStep = (EntityTargetStep ) step ;
@@ -347,8 +347,7 @@ public String getName() {
347347 }
348348 }
349349
350- private static class TargetSchemaIO
351- extends PTransform <@ NonNull PCollection <Integer >, @ NonNull PCollection <WriteCounters >> {
350+ private static class TargetSchemaIO extends PTransform <PCollection <Integer >, PCollection <WriteCounters >> {
352351
353352 private final String url ;
354353
@@ -362,13 +361,13 @@ private TargetSchemaIO(String url, String password, EntityTargetStep target) {
362361 this .target = target ;
363362 }
364363
365- public static PTransform <@ NonNull PCollection <Integer >, @ NonNull PCollection <WriteCounters >> initSchema (
364+ public static PTransform <PCollection <Integer >, PCollection <WriteCounters >> initSchema (
366365 String url , String password , EntityTargetStep target ) {
367366 return new TargetSchemaIO (url , password , target );
368367 }
369368
370369 @ Override
371- public @ NonNull PCollection <WriteCounters > expand (@ NonNull PCollection <Integer > input ) {
370+ public PCollection <WriteCounters > expand (PCollection <Integer > input ) {
372371 return input .apply (ParDo .of (TargetSchemaWriteFn .of (url , password , target )));
373372 }
374373
@@ -552,8 +551,7 @@ private static String propertyType(PropertyType propertyType) {
552551 }
553552
554553 private static class TargetIO
555- extends PTransform <
556- @ NonNull PCollection <KV <Integer , Iterable <GenericRecord >>>, @ NonNull PCollection <WriteCounters >> {
554+ extends PTransform <PCollection <KV <Integer , Iterable <GenericRecord >>>, PCollection <WriteCounters >> {
557555
558556 private final String url ;
559557
@@ -567,14 +565,13 @@ private TargetIO(String url, String password, EntityTargetStep target) {
567565 this .target = target ;
568566 }
569567
570- public static PTransform <
571- @ NonNull PCollection <KV <Integer , Iterable <GenericRecord >>>, @ NonNull PCollection <WriteCounters >>
568+ public static PTransform <PCollection <KV <Integer , Iterable <GenericRecord >>>, PCollection <WriteCounters >>
572569 writeAll (String boltUrl , String adminPassword , EntityTargetStep target ) {
573570 return new TargetIO (boltUrl , adminPassword , target );
574571 }
575572
576573 @ Override
577- public @ NonNull PCollection <WriteCounters > expand (PCollection <KV <Integer , Iterable <GenericRecord >>> input ) {
574+ public PCollection <WriteCounters > expand (PCollection <KV <Integer , Iterable <GenericRecord >>> input ) {
578575 return input .apply (ParDo .of (TargetWriteFn .of (url , password , target )));
579576 }
580577
@@ -774,7 +771,7 @@ public static Coder<GenericRecord> create() {
774771 }
775772
776773 @ Override
777- public void encode (GenericRecord value , @ NonNull OutputStream outStream ) throws IOException {
774+ public void encode (GenericRecord value , OutputStream outStream ) throws IOException {
778775 assertThat (value ).isNotNull ();
779776 var schema = value .getSchema ();
780777 String schemaString = schema .toString ();
@@ -786,7 +783,7 @@ public void encode(GenericRecord value, @NonNull OutputStream outStream) throws
786783 }
787784
788785 @ Override
789- public GenericRecord decode (@ NonNull InputStream inStream ) throws IOException {
786+ public GenericRecord decode (InputStream inStream ) throws IOException {
790787 String schemaString = StringUtf8Coder .of ().decode (inStream );
791788 String schemaHash = StringUtf8Coder .of ().decode (inStream );
792789 AvroCoder <GenericRecord > coder =
@@ -1005,8 +1002,7 @@ public String toString() {
10051002 }
10061003 }
10071004
1008- private static class CypherActionIO
1009- extends PTransform <@ NonNull PCollection <Integer >, @ NonNull PCollection <Integer >> {
1005+ private static class CypherActionIO extends PTransform <PCollection <Integer >, PCollection <Integer >> {
10101006
10111007 private final CypherAction action ;
10121008
@@ -1020,13 +1016,13 @@ private CypherActionIO(CypherAction action, String url, String password) {
10201016 this .password = password ;
10211017 }
10221018
1023- public static PTransform <@ NonNull PCollection <Integer >, @ NonNull PCollection <Integer >> run (
1019+ public static PTransform <PCollection <Integer >, PCollection <Integer >> run (
10241020 CypherAction action , String url , String password ) {
10251021 return new CypherActionIO (action , url , password );
10261022 }
10271023
10281024 @ Override
1029- public @ NonNull PCollection <Integer > expand (@ NonNull PCollection <Integer > input ) {
1025+ public PCollection <Integer > expand (PCollection <Integer > input ) {
10301026 return input .apply (ParDo .of (new CypherActionFn (action , url , password )));
10311027 }
10321028 }
0 commit comments