@@ -122,30 +122,28 @@ public TransactionalFileOutput open(TaskSource taskSource, final int taskIndex)
122122 {
123123 private final List <String > hdfsFileNames = new ArrayList <>();
124124 private int fileIndex = 0 ;
125+ private Path currentPath = null ;
125126 private OutputStream output = null ;
126127
127128 @ Override
128129 public void nextFile ()
129130 {
130131 closeCurrentStream ();
131- Path path = new Path (pathPrefix + String .format (sequenceFormat , taskIndex , fileIndex ) + pathSuffix );
132- try {
133- FileSystem fs = getFs (task );
134- output = fs .create (path , task .getOverwrite ());
135- logger .info ("Uploading '{}'" , path );
136- }
137- catch (IOException e ) {
138- logger .error (e .getMessage ());
139- throw new RuntimeException (e );
140- }
141- hdfsFileNames .add (path .toString ());
132+ currentPath = new Path (pathPrefix + String .format (sequenceFormat , taskIndex , fileIndex ) + pathSuffix );
142133 fileIndex ++;
143134 }
144135
145136 @ Override
146137 public void add (Buffer buffer )
147138 {
148139 try {
140+ // this implementation is for creating file when there is data.
141+ if (output == null ) {
142+ FileSystem fs = getFs (task );
143+ output = fs .create (currentPath , task .getOverwrite ());
144+ logger .info ("Uploading '{}'" , currentPath );
145+ hdfsFileNames .add (currentPath .toString ());
146+ }
149147 output .write (buffer .array (), buffer .offset (), buffer .limit ());
150148 }
151149 catch (IOException e ) {
0 commit comments