44namespace ScriptFUSION \Porter ;
55
66use Amp \Iterator ;
7- use Amp \Producer ;
87use Amp \Promise ;
98use Psr \Container \ContainerInterface ;
9+ use ScriptFUSION \Porter \Collection \AsyncPorterRecords ;
10+ use ScriptFUSION \Porter \Collection \AsyncProviderRecords ;
11+ use ScriptFUSION \Porter \Collection \AsyncRecordCollection ;
12+ use ScriptFUSION \Porter \Collection \CountableAsyncPorterRecords ;
1013use ScriptFUSION \Porter \Collection \CountablePorterRecords ;
1114use ScriptFUSION \Porter \Collection \CountableProviderRecords ;
1215use ScriptFUSION \Porter \Collection \PorterRecords ;
@@ -125,13 +128,19 @@ private function fetch(ImportSpecification $specification): \Iterator
125128 return $ resource ->fetch (ImportConnectorFactory::create ($ connector , $ specification ));
126129 }
127130
128- public function importAsync (AsyncImportSpecification $ specification ): Iterator
131+ public function importAsync (AsyncImportSpecification $ specification ): AsyncRecordCollection
129132 {
130133 $ specification = clone $ specification ;
131134
132135 $ records = $ this ->fetchAsync ($ specification );
133136
134- return $ this ->transformAsync ($ records , $ specification ->getTransformers (), $ specification ->getContext ());
137+ if (!$ records instanceof AsyncProviderRecords) {
138+ $ records = new AsyncProviderRecords ($ records , $ specification ->getAsyncResource ());
139+ }
140+
141+ $ records = $ this ->transformAsync ($ records , $ specification ->getTransformers (), $ specification ->getContext ());
142+
143+ return $ this ->createAsyncPorterRecords ($ records , $ specification );
135144 }
136145
137146 public function importOneAsync (AsyncImportSpecification $ specification ): Promise
@@ -201,38 +210,25 @@ private function transformRecords(RecordCollection $records, array $transformers
201210 return $ records ;
202211 }
203212
204- private function transformAsync (Iterator $ records , array $ transformers , $ context ): Producer
205- {
206- return new Producer (function (\Closure $ emit ) use ($ records , $ transformers , $ context ) {
207- while (yield $ records ->advance ()) {
208- $ record = $ records ->getCurrent ();
209-
210- foreach ($ transformers as $ transformer ) {
211- if (!$ transformer instanceof AsyncTransformer) {
212- // TODO: Proper exception or separate async stack.
213- throw new \RuntimeException ('Cannot use sync transformer. ' );
214- }
215- if ($ transformer instanceof PorterAware) {
216- $ transformer ->setPorter ($ this );
217- }
218-
219- $ record = yield $ transformer ->transformAsync ($ record , $ context );
220-
221- if ($ record === null ) {
222- // Do not process more transformers.
223- break ;
224- }
225- }
226-
227- if ($ record !== null ) {
228- if (!\is_array ($ record )) {
229- throw new \RuntimeException ('Unexpected type: record must be array or null. ' );
230- }
231-
232- yield $ emit ($ record );
233- }
213+ private function transformAsync (
214+ AsyncRecordCollection $ records ,
215+ array $ transformers ,
216+ $ context
217+ ): AsyncRecordCollection {
218+ foreach ($ transformers as $ transformer ) {
219+ if (!$ transformer instanceof AsyncTransformer) {
220+ // TODO: Proper exception or separate async stack.
221+ throw new \RuntimeException ('Cannot use sync transformer. ' );
234222 }
235- });
223+
224+ if ($ transformer instanceof PorterAware) {
225+ $ transformer ->setPorter ($ this );
226+ }
227+
228+ $ records = $ transformer ->transformAsync ($ records , $ context );
229+ }
230+
231+ return $ records ;
236232 }
237233
238234 private function createProviderRecords (\Iterator $ records , ProviderResource $ resource ): ProviderRecords
@@ -253,6 +249,17 @@ private function createPorterRecords(RecordCollection $records, ImportSpecificat
253249 return new PorterRecords ($ records , $ specification );
254250 }
255251
252+ private function createAsyncPorterRecords (
253+ AsyncRecordCollection $ records ,
254+ AsyncImportSpecification $ specification
255+ ): AsyncPorterRecords {
256+ if ($ records instanceof \Countable) {
257+ return new CountableAsyncPorterRecords ($ records , \count ($ records ), $ specification );
258+ }
259+
260+ return new AsyncPorterRecords ($ records , $ specification );
261+ }
262+
256263 /**
257264 * Gets the provider matching the specified name.
258265 *
0 commit comments