Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions .devcontainer/devcontainer.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
// For format details, see https://aka.ms/devcontainer.json. For config options, see the
// README at: https://github.com/devcontainers/templates/tree/main/src/java
{
"name": "Java",
// Or use a Dockerfile or Docker Compose file. More info: https://containers.dev/guide/dockerfile
"image": "mcr.microsoft.com/devcontainers/java:1-11-bookworm",

"features": {
"ghcr.io/devcontainers/features/java:1": {
"version": "none",
"installMaven": "true",
"installGradle": "false"
}
}

// Use 'forwardPorts' to make a list of ports inside the container available locally.
// "forwardPorts": [],

// Use 'postCreateCommand' to run commands after the container is created.
// "postCreateCommand": "java -version",

// Configure tool-specific properties.
// "customizations": {},

// Uncomment to connect as root instead. More info: https://aka.ms/dev-containers-non-root.
// "remoteUser": "root"
}
43 changes: 42 additions & 1 deletion src/main/java/org/archive/hadoop/jobs/WEATGenerator.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.archive.extract.ExtractorOutput;
import org.archive.extract.ProducerUtils;
import org.archive.extract.ResourceFactoryMapper;
import org.archive.extract.RealCDXExtractorOutput;
import org.archive.extract.WATExtractorOutput;
import org.archive.extract.WETExtractorOutput;
import org.archive.format.json.JSONUtils;
Expand Down Expand Up @@ -53,7 +54,7 @@ public class WEATGenerator extends Configured implements Tool {
public final static String TOOL_DESCRIPTION = "Generate WAT and WET files from (W)ARC files stored in HDFS";

public static final Log LOG = LogFactory.getLog(WEATGenerator.class);

public static class WEATGeneratorMapper extends MapReduceBase implements Mapper<Text, Text, Text, Text> {
private JobConf jobConf;

Expand Down Expand Up @@ -89,21 +90,30 @@ public void map( Text key, Text value, OutputCollector output, Reporter reporter

Path inputPath = new Path(path);
Path basePath = inputPath.getParent().getParent();
Path cdxBasePath = new Path(jobConf.get("cdxBasePath", basePath.toString()));

String inputBasename = inputPath.getName();
String watOutputBasename = "";
String wetOutputBasename = "";
String cdxWatOutputBasename = "";
String cdxWetOutputBasename = "";

if(path.endsWith(".gz")) {
watOutputBasename = inputBasename.substring(0,inputBasename.length()-3) + ".wat.gz";
wetOutputBasename = inputBasename.substring(0,inputBasename.length()-3) + ".wet.gz";
cdxWatOutputBasename = inputBasename.replace(".warc.gz", ".wat.cdx.gz");
cdxWetOutputBasename = inputBasename.replace(".warc.gz", ".wet.cdx.gz");
} else {
watOutputBasename = inputBasename + ".wat.gz";
wetOutputBasename = inputBasename + ".wet.gz";
cdxWatOutputBasename = inputBasename.replace(".warc", ".wat.cdx.gz");
cdxWetOutputBasename = inputBasename.replace(".warc", ".wet.cdx.gz");
}

String watOutputFileString = basePath.toString() + "/wat/" + watOutputBasename;
String wetOutputFileString = basePath.toString() + "/wet/" + wetOutputBasename;
String cdxWetOutputFileString = cdxBasePath.toString() + "/" + cdxWetOutputBasename;
String cdxWatOutputFileString = cdxBasePath.toString() + "/" + cdxWatOutputBasename;

LOG.info("About to write out to " + watOutputFileString + " and " + wetOutputFileString);
if (this.jobConf.getBoolean("skipExisting", false)) {
Expand All @@ -123,6 +133,17 @@ public void map( Text key, Text value, OutputCollector output, Reporter reporter
FSDataOutputStream wetfsdOut = FileSystem.get(new java.net.URI(wetOutputFileString), this.jobConf).create(new Path(wetOutputFileString), false);
ExtractorOutput wetOut = new WETExtractorOutput(wetfsdOut, wetOutputBasename);

FSDataOutputStream cdxWetfsOut = null;
ExtractorOutput cdxWetOut = null;
FSDataOutputStream cdxWatfsOut = null;
ExtractorOutput cdxWatOut = null;
if ( this.jobConf.getBoolean("outputCDX", false) ) {
cdxWetfsOut = FileSystem.get(new java.net.URI(cdxWetOutputFileString), this.jobConf).create(new Path(cdxWetOutputFileString), false);
cdxWetOut = new RealCDXExtractorOutput(new PrintWriter(cdxWetfsOut));
cdxWatfsOut = FileSystem.get(new java.net.URI(cdxWatOutputFileString), this.jobConf).create(new Path(cdxWatOutputFileString), false);
cdxWatOut = new RealCDXExtractorOutput(new PrintWriter(cdxWatfsOut));
}

int count = 0;
Resource lr = null;
while(count < Integer.MAX_VALUE) {
Expand All @@ -143,6 +164,9 @@ public void map( Text key, Text value, OutputCollector output, Reporter reporter
LOG.info("Outputting new record " + count);
}
watOut.output(r);
if( cdxWatOut != null ) {
cdxWatOut.output(r);
}
if (lr != null && isMetaConcurrentTo(r, lr)) {
JSONArray payloadMetadata = JSONUtils.extractArray(r.getMetaData().getTopMetaData(),
"Envelope.Payload-Metadata.WARC-Metadata-Metadata.Metadata-Records");
Expand All @@ -152,6 +176,9 @@ public void map( Text key, Text value, OutputCollector output, Reporter reporter
}
if (lr != null) {
wetOut.output(lr);
if( cdxWetOut != null ) {
cdxWetOut.output(lr);
}
}
lr = r;
}
Expand All @@ -160,6 +187,12 @@ public void map( Text key, Text value, OutputCollector output, Reporter reporter
}
watfsdOut.close();
wetfsdOut.close();
if ( cdxWatfsOut != null ) {
cdxWatfsOut.close();
}
if ( cdxWetfsOut != null ) {
cdxWetfsOut.close();
}
} catch ( Exception e ) {
LOG.error( "Error processing file: " + path, e );
reporter.incrCounter("exporter", "errors", 1);
Expand Down Expand Up @@ -259,6 +292,7 @@ public int run( String[] args ) throws Exception {
// keep job running despite some failures in generating WATs
job.setBoolean("strictMode",false);
job.setBoolean("skipExisting", false);
job.setBoolean("outputCDX", false);

job.setOutputFormat(NullOutputFormat.class);
job.setOutputKeyClass(Text.class);
Expand All @@ -274,6 +308,13 @@ public int run( String[] args ) throws Exception {
} else if(args[arg].equals("-skipExisting")) {
job.setBoolean("skipExisting", true);
arg++;
} else if(args[arg].equals("-outputCDX")) {
job.setBoolean("outputCDX", true);
arg++;
} else if(args[arg].equals("-cdxBasePath")) {
job.set("cdxBasePath", args[arg+1]);
arg++;
arg++;
} else {
break;
}
Expand Down