Skip to content

Commit 54b4505

Browse files
authored
Merge pull request #19 from civitaspo/atomic
Add mode option with replace mode
2 parents 213f330 + cf1d1a8 commit 54b4505

21 files changed

+1187
-208
lines changed

.travis.yml

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
dist: precise
12
language: java
23
jdk:
34
- openjdk7
@@ -6,4 +7,8 @@ jdk:
67
script:
78
- ./gradlew test
89
after_success:
9-
- ./gradlew jacocoTestReport coveralls
10+
- ./gradlew jacocoTestReport coveralls
11+
addons:
12+
hosts:
13+
- example.com
14+
hostname: example.com

README.md

Lines changed: 38 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -14,26 +14,55 @@ A File Output Plugin for Embulk to write HDFS.
1414

1515
## Configuration
1616

17-
- **config_files** list of paths to Hadoop's configuration files (array of strings, default: `[]`)
18-
- **config** overwrites configuration parameters (hash, default: `{}`)
19-
- **path_prefix** prefix of target files (string, required)
20-
- **file_ext** suffix of target files (string, required)
21-
- **sequence_format** format for sequence part of target files (string, default: `'%03d.%02d.'`)
22-
- **rewind_seconds** When you use Date format in path_prefix property(like `/tmp/embulk/%Y-%m-%d/out`), the format is interpreted by using the time which is Now minus this property. (int, default: `0`)
23-
- **overwrite** overwrite files when the same filenames already exists (boolean, default: `false`)
17+
- **config_files**: list of paths to Hadoop's configuration files (array of strings, default: `[]`)
18+
- **config**: overwrites configuration parameters (hash, default: `{}`)
19+
- **path_prefix**: prefix of target files (string, required)
20+
- **file_ext**: suffix of target files (string, required)
21+
- **sequence_format**: format for sequence part of target files (string, default: `'%03d.%02d.'`)
22+
- **rewind_seconds**: When you use Date format in path_prefix property(like `/tmp/embulk/%Y-%m-%d/out`), the format is interpreted by using the time which is Now minus this property. (int, default: `0`)
23+
- **doas**: username which access to Hdfs (string, default: executed user)
24+
- **overwrite** *(Deprecated: Please use `mode` option instead)*: overwrite files when the same filenames already exists (boolean, default: `false`)
2425
- *caution*: even if this property is `true`, this does not mean ensuring the idempotence. if you want to ensure the idempotence, you need the procedures to remove output files after or before running.
25-
- **doas** username which access to Hdfs (string, default: executed user)
26-
- **delete_in_advance** delete files and directories having `path_prefix` in advance (enum, default: `NONE`)
26+
- **delete_in_advance** *(Deprecated: Please use `mode` option instead)*: delete files and directories having `path_prefix` in advance (enum, default: `NONE`)
2727
- `NONE`: do nothing
2828
- `FILE_ONLY`: delete files
2929
- `RECURSIVE`: delete files and directories
30+
- **mode**: "abort_if_exist", "overwrite", "delete_files_in_advance", "delete_recursive_in_advance", or "replace". See below. (string, optional, default: `"abort_if_exist"`)
31+
* In the future, default mode will become `"replace"`.
3032

3133
## CAUTION
3234
If you use `hadoop` user (hdfs admin user) as `doas`, and if `delete_in_advance` is `RECURSIVE`,
3335
`embulk-output-hdfs` can delete any files and directories you indicate as `path_prefix`,
3436
this means `embulk-output-hdfs` can destroy your hdfs.
3537
So, please be careful when you use `delete_in_advance` option and `doas` option ...
3638

39+
## About DELETE
40+
41+
When this plugin deletes files or directories, use [`Hadoop Trash API`](https://hadoop.apache.org/docs/r2.8.0/api/org/apache/hadoop/fs/Trash.html). So, you can find them in the trash during `fs.trash.interval`.
42+
43+
## Modes
44+
45+
* **abort_if_exist**:
46+
* Behavior: This mode writes rows to the target files in order. If target files already exist, abort the transaction.
47+
* Transactional: No. If fails, the target files could have some rows written.
48+
* Resumable: No.
49+
* **overwrite**:
50+
* Behavior: This mode writes rows to the target files in order. If target files already exist, this re-write from the beginning of the file.
51+
* Transactional: No. If fails, the target files could have some rows written.
52+
* Resumable: No.
53+
* **delete_files_in_advance**:
54+
* Behavior: This mode delete files at first, then writes rows to the target files in order.
55+
* Transactional: No. If fails, the target files could be removed.
56+
* Resumable: No.
57+
* **delete_recursive_in_advance**:
58+
* Behavior: This mode delete directories recursively at first, then writes rows to the target files in order.
59+
* Transactional: No. If fails, the target files could be removed.
60+
* Resumable: No.
61+
* **replace**:
62+
* Behavior: This mode writes rows to the workspace files in order, then replace them to target directories. This **replace** is not **atomic** because hdfs api does not have atomic replace.
63+
* Transactional: No. If fails, the target files could be removed.
64+
* Resumable: No.
65+
3766
## Example
3867

3968
```yaml

example/config.yml

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,12 +6,14 @@ hdfs_example: &hdfs_example
66
fs.defaultFS: 'hdfs://hadoop-nn1:8020'
77
fs.hdfs.impl: 'org.apache.hadoop.hdfs.DistributedFileSystem'
88
fs.file.impl: 'org.apache.hadoop.fs.LocalFileSystem'
9+
fs.trash.interval: 3600
910

1011
local_fs_example: &local_fs_example
1112
config:
1213
fs.defaultFS: 'file:///'
1314
fs.hdfs.impl: 'org.apache.hadoop.fs.RawLocalFileSystem'
1415
fs.file.impl: 'org.apache.hadoop.fs.RawLocalFileSystem'
16+
fs.trash.interval: 3600
1517
io.compression.codecs: 'org.apache.hadoop.io.compress.GzipCodec,org.apache.hadoop.io.compress.DefaultCodec,org.apache.hadoop.io.compress.BZip2Codec'
1618

1719
in:
@@ -38,7 +40,7 @@ out:
3840
<<: *local_fs_example
3941
path_prefix: /tmp/embulk-output-hdfs_example/file_
4042
file_ext: csv
41-
delete_in_advance: FILE_ONLY
43+
mode: replace
4244
formatter:
4345
type: csv
4446
newline: CRLF
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
hdfs_example: &hdfs_example
2+
config_files:
3+
- /etc/hadoop/conf/core-site.xml
4+
- /etc/hadoop/conf/hdfs-site.xml
5+
config:
6+
fs.defaultFS: 'hdfs://hadoop-nn1:8020'
7+
fs.hdfs.impl: 'org.apache.hadoop.hdfs.DistributedFileSystem'
8+
fs.file.impl: 'org.apache.hadoop.fs.LocalFileSystem'
9+
10+
local_fs_example: &local_fs_example
11+
config:
12+
fs.defaultFS: 'file:///'
13+
fs.hdfs.impl: 'org.apache.hadoop.fs.RawLocalFileSystem'
14+
fs.file.impl: 'org.apache.hadoop.fs.RawLocalFileSystem'
15+
io.compression.codecs: 'org.apache.hadoop.io.compress.GzipCodec,org.apache.hadoop.io.compress.DefaultCodec,org.apache.hadoop.io.compress.BZip2Codec'
16+
17+
in:
18+
type: file
19+
path_prefix: example/data
20+
parser:
21+
charset: UTF-8
22+
newline: CRLF
23+
type: csv
24+
delimiter: ','
25+
quote: '"'
26+
header_line: true
27+
stop_on_invalid_record: true
28+
columns:
29+
- {name: id, type: long}
30+
- {name: account, type: long}
31+
- {name: time, type: timestamp, format: '%Y-%m-%d %H:%M:%S'}
32+
- {name: purchase, type: timestamp, format: '%Y%m%d'}
33+
- {name: comment, type: string}
34+
35+
36+
out:
37+
type: hdfs
38+
<<: *local_fs_example
39+
path_prefix: /tmp/embulk-output-hdfs_example/file_
40+
file_ext: csv
41+
delete_in_advance: FILE_ONLY
42+
formatter:
43+
type: csv
44+
newline: CRLF
45+
newline_in_field: LF
46+
header_line: true
47+
charset: UTF-8
48+
quote_policy: NONE
49+
quote: '"'
50+
escape: '\'
51+
null_string: ''
52+
default_timezone: UTC
Lines changed: 160 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,160 @@
1+
package org.embulk.output.hdfs;
2+
3+
import org.apache.hadoop.fs.Path;
4+
import org.embulk.config.TaskReport;
5+
import org.embulk.output.hdfs.HdfsFileOutputPlugin.PluginTask;
6+
import org.embulk.output.hdfs.client.HdfsClient;
7+
import org.embulk.spi.Buffer;
8+
import org.embulk.spi.Exec;
9+
import org.embulk.spi.FileOutput;
10+
import org.embulk.spi.TransactionalFileOutput;
11+
import org.embulk.spi.util.RetryExecutor;
12+
import org.slf4j.Logger;
13+
14+
import java.io.IOException;
15+
import java.io.OutputStream;
16+
17+
public class HdfsFileOutput
18+
implements FileOutput, TransactionalFileOutput
19+
{
20+
private static final Logger logger = Exec.getLogger(HdfsFileOutput.class);
21+
private final RetryExecutor re = RetryExecutor.retryExecutor()
22+
.withRetryLimit(3)
23+
.withMaxRetryWait(500) // ms
24+
.withMaxRetryWait(10 * 60 * 1000); // ms
25+
26+
private final HdfsClient hdfsClient;
27+
private final int taskIdx;
28+
private final String pathPrefix;
29+
private final String sequenceFormat;
30+
private final String fileExt;
31+
private final boolean overwrite;
32+
33+
private int fileIdx = 0;
34+
private Path currentPath = null;
35+
private OutputStream o = null;
36+
37+
public HdfsFileOutput(PluginTask task, String pathPrefix, boolean overwrite, int taskIdx)
38+
{
39+
this.hdfsClient = HdfsClient.build(task);
40+
this.pathPrefix = pathPrefix;
41+
this.taskIdx = taskIdx;
42+
this.sequenceFormat = task.getSequenceFormat();
43+
this.fileExt = task.getFileExt();
44+
this.overwrite = overwrite;
45+
}
46+
47+
@Override
48+
public void abort()
49+
{
50+
}
51+
52+
@Override
53+
public TaskReport commit()
54+
{
55+
return Exec.newTaskReport();
56+
}
57+
58+
@Override
59+
public void nextFile()
60+
{
61+
closeCurrentStream();
62+
currentPath = newPath();
63+
fileIdx++;
64+
}
65+
66+
@Override
67+
public void add(Buffer buffer)
68+
{
69+
try {
70+
// this implementation is for creating file when there is data.
71+
if (o == null) {
72+
o = hdfsClient.create(currentPath, overwrite);
73+
logger.info("Uploading '{}'", currentPath);
74+
}
75+
write(buffer);
76+
}
77+
catch (RetryExecutor.RetryGiveupException e) {
78+
throw new RuntimeException(e);
79+
}
80+
finally {
81+
buffer.release();
82+
}
83+
}
84+
85+
@Override
86+
public void finish()
87+
{
88+
closeCurrentStream();
89+
}
90+
91+
@Override
92+
public void close()
93+
{
94+
closeCurrentStream();
95+
hdfsClient.close();
96+
}
97+
98+
private void write(final Buffer buffer)
99+
throws RetryExecutor.RetryGiveupException
100+
{
101+
re.run(new RetryExecutor.Retryable<Void>()
102+
{
103+
@Override
104+
public Void call()
105+
throws Exception
106+
{
107+
o.write(buffer.array(), buffer.offset(), buffer.limit());
108+
return null;
109+
}
110+
111+
@Override
112+
public boolean isRetryableException(Exception exception)
113+
{
114+
return true; // TODO: which Exception is retryable?
115+
}
116+
117+
@Override
118+
public void onRetry(Exception exception, int retryCount, int retryLimit, int retryWait)
119+
throws RetryExecutor.RetryGiveupException
120+
{
121+
String m = String.format(
122+
"%s. (Retry: Count: %d, Limit: %d, Wait: %d ms)",
123+
exception.getMessage(),
124+
retryCount,
125+
retryLimit,
126+
retryWait);
127+
logger.warn(m, exception);
128+
}
129+
130+
@Override
131+
public void onGiveup(Exception firstException, Exception lastException)
132+
throws RetryExecutor.RetryGiveupException
133+
{
134+
}
135+
});
136+
}
137+
138+
private Path newPath()
139+
{
140+
return new Path(pathPrefix + getSequence() + fileExt);
141+
}
142+
143+
private String getSequence()
144+
{
145+
return String.format(sequenceFormat, taskIdx, fileIdx);
146+
}
147+
148+
private void closeCurrentStream()
149+
{
150+
if (o != null) {
151+
try {
152+
o.close();
153+
o = null;
154+
}
155+
catch (IOException e) {
156+
throw new RuntimeException(e);
157+
}
158+
}
159+
}
160+
}

0 commit comments

Comments
 (0)