|
1 | 1 | package org.embulk.output.hdfs; |
2 | 2 |
|
3 | | -import java.io.File; |
4 | | -import java.io.IOException; |
5 | | -import java.io.OutputStream; |
6 | | -import java.util.ArrayList; |
7 | | -import java.util.List; |
8 | | -import java.util.Map; |
9 | | - |
| 3 | +import com.google.common.base.Optional; |
| 4 | +import com.google.common.base.Throwables; |
10 | 5 | import org.apache.hadoop.conf.Configuration; |
11 | 6 | import org.apache.hadoop.fs.FileSystem; |
12 | 7 | import org.apache.hadoop.fs.Path; |
13 | | -import org.embulk.config.TaskReport; |
14 | 8 | import org.embulk.config.Config; |
15 | 9 | import org.embulk.config.ConfigDefault; |
16 | 10 | import org.embulk.config.ConfigDiff; |
17 | 11 | import org.embulk.config.ConfigSource; |
18 | 12 | import org.embulk.config.Task; |
| 13 | +import org.embulk.config.TaskReport; |
19 | 14 | import org.embulk.config.TaskSource; |
20 | 15 | import org.embulk.spi.Buffer; |
21 | 16 | import org.embulk.spi.Exec; |
|
24 | 19 | import org.jruby.embed.ScriptingContainer; |
25 | 20 | import org.slf4j.Logger; |
26 | 21 |
|
| 22 | +import java.io.File; |
| 23 | +import java.io.IOException; |
| 24 | +import java.io.OutputStream; |
| 25 | +import java.net.URI; |
| 26 | +import java.util.ArrayList; |
| 27 | +import java.util.List; |
| 28 | +import java.util.Map; |
| 29 | + |
27 | 30 | public class HdfsFileOutputPlugin |
28 | 31 | implements FileOutputPlugin |
29 | 32 | { |
@@ -58,6 +61,9 @@ public interface PluginTask |
58 | 61 | @ConfigDefault("false") |
59 | 62 | public boolean getOverwrite(); |
60 | 63 |
|
| 64 | + @Config("doas") |
| 65 | + @ConfigDefault("null") |
| 66 | + public Optional<String> getDoas(); |
61 | 67 | } |
62 | 68 |
|
63 | 69 | @Override |
@@ -186,6 +192,15 @@ private static FileSystem getFs(final PluginTask task) |
186 | 192 | configuration.set(entry.getKey(), entry.getValue()); |
187 | 193 | } |
188 | 194 |
|
| 195 | + if (task.getDoas().isPresent()) { |
| 196 | + URI uri = FileSystem.getDefaultUri(configuration); |
| 197 | + try { |
| 198 | + return FileSystem.get(uri, configuration, task.getDoas().get()); |
| 199 | + } |
| 200 | + catch (InterruptedException e) { |
| 201 | + throw Throwables.propagate(e); |
| 202 | + } |
| 203 | + } |
189 | 204 | return FileSystem.get(configuration); |
190 | 205 | } |
191 | 206 |
|
|
0 commit comments