From cb6188f4f9b47a9126d47a0a16fc6ff9e8f22e87 Mon Sep 17 00:00:00 2001 From: Jason Grey Date: Tue, 2 Jul 2024 16:02:45 -0500 Subject: [PATCH 1/4] laying down TODO's for integrity integration --- src/main/java/org/archive/hadoop/jobs/WEATGenerator.java | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/main/java/org/archive/hadoop/jobs/WEATGenerator.java b/src/main/java/org/archive/hadoop/jobs/WEATGenerator.java index 24c7799..f0d4c0c 100644 --- a/src/main/java/org/archive/hadoop/jobs/WEATGenerator.java +++ b/src/main/java/org/archive/hadoop/jobs/WEATGenerator.java @@ -54,6 +54,9 @@ public class WEATGenerator extends Configured implements Tool { public static final Log LOG = LogFactory.getLog(WEATGenerator.class); + // TODO: modify this class to also output cdx files. Look at hadoop plugins for fetch and/or other classes in this project for hints. + // TODO: new property for cdx paths or something is likely needed, same as fetch... + public static class WEATGeneratorMapper extends MapReduceBase implements Mapper { private JobConf jobConf; From 42f6479d67dca717b923cccc671ea75f0bc2928d Mon Sep 17 00:00:00 2001 From: Jason Grey Date: Sun, 21 Jul 2024 21:06:11 -0500 Subject: [PATCH 2/4] Added CDX generation (-outputCDX) to the WEATGenerator - may not compile yet, local env not available today. --- .../archive/hadoop/jobs/WEATGenerator.java | 40 +++++++++++++++++++ 1 file changed, 40 insertions(+) diff --git a/src/main/java/org/archive/hadoop/jobs/WEATGenerator.java b/src/main/java/org/archive/hadoop/jobs/WEATGenerator.java index f0d4c0c..b957d63 100644 --- a/src/main/java/org/archive/hadoop/jobs/WEATGenerator.java +++ b/src/main/java/org/archive/hadoop/jobs/WEATGenerator.java @@ -25,6 +25,7 @@ import org.archive.extract.ExtractorOutput; import org.archive.extract.ProducerUtils; import org.archive.extract.ResourceFactoryMapper; +import org.archive.extract.RealCDXExtractorOutput; import org.archive.extract.WATExtractorOutput; import org.archive.extract.WETExtractorOutput; import org.archive.format.json.JSONUtils; @@ -96,17 +97,25 @@ public void map( Text key, Text value, OutputCollector output, Reporter reporter String inputBasename = inputPath.getName(); String watOutputBasename = ""; String wetOutputBasename = ""; + String cdxWatOutputBasename = ""; + String cdxWetOutputBasename = ""; if(path.endsWith(".gz")) { watOutputBasename = inputBasename.substring(0,inputBasename.length()-3) + ".wat.gz"; wetOutputBasename = inputBasename.substring(0,inputBasename.length()-3) + ".wet.gz"; + cdxWatOutputBasename = inputBasename.substring(0,inputBasename.length()-3) + ".cdxwat.gz"; + cdxWetOutputBasename = inputBasename.substring(0,inputBasename.length()-3) + ".cdxwet.gz"; } else { watOutputBasename = inputBasename + ".wat.gz"; wetOutputBasename = inputBasename + ".wet.gz"; + cdxWatOutputBasename = inputBasename + ".cdxwat.gz"; + cdxWetOutputBasename = inputBasename + ".cdxwet.gz"; } String watOutputFileString = basePath.toString() + "/wat/" + watOutputBasename; String wetOutputFileString = basePath.toString() + "/wet/" + wetOutputBasename; + String cdxWetOutputFileString = basePath.toString() + "/cdxwet/" + cdxWetOutputBasename; + String cdxWatOutputFileString = basePath.toString() + "/cdxwat/" + cdxWatOutputBasename; LOG.info("About to write out to " + watOutputFileString + " and " + wetOutputFileString); if (this.jobConf.getBoolean("skipExisting", false)) { @@ -126,6 +135,21 @@ public void map( Text key, Text value, OutputCollector output, Reporter reporter FSDataOutputStream wetfsdOut = FileSystem.get(new java.net.URI(wetOutputFileString), this.jobConf).create(new Path(wetOutputFileString), false); ExtractorOutput wetOut = new WETExtractorOutput(wetfsdOut, wetOutputBasename); + FSDataOutputStream cdxWetfsOut = null; + ExtractorOutput cdxWetOut = null; + FSDataOutputStream cdxWatfsOut = null; + ExtractorOutput cdxWatOut = null; + if ( this.jobConf.getBoolean("outputCDX") ) { + cdxWetfsOut = FileSystem.get(new java.net.URI(cdxWetOutputFileString), this.jobConf).create(new Path(cdxWetOutputFileString), false); + cdxWetOut = new RealCDXExtractorOutput(new PrintWriter(cdxWetfsOut)); + cdxWatfsOut = FileSystem.get(new java.net.URI(cdxWatOutputFileString), this.jobConf).create(new Path(cdxWatOutputFileString), false); + cdxWatOut = new RealCDXExtractorOutput(new PrintWriter(cdxWatfsOut)); + } + FSDataOutputStream cdxWetfsOut = FileSystem.get(new java.net.URI(cdxWetOutputFileString), this.jobConf).create(new Path(cdxWetOutputFileString), false); + ExtractorOutput cdxWetOut = new RealCDXExtractorOutput(new PrintWriter(cdxWetfsOut)); + FSDataOutputStream cdxWatfsOut = FileSystem.get(new java.net.URI(cdxWatOutputFileString), this.jobConf).create(new Path(cdxWatOutputFileString), false); + ExtractorOutput cdxWatOut = new RealCDXExtractorOutput(new PrintWriter(cdxWatfsOut)); + int count = 0; Resource lr = null; while(count < Integer.MAX_VALUE) { @@ -146,6 +170,9 @@ public void map( Text key, Text value, OutputCollector output, Reporter reporter LOG.info("Outputting new record " + count); } watOut.output(r); + if( cdxWatOut != null ) { + cdxWatOut.output(r); + } if (lr != null && isMetaConcurrentTo(r, lr)) { JSONArray payloadMetadata = JSONUtils.extractArray(r.getMetaData().getTopMetaData(), "Envelope.Payload-Metadata.WARC-Metadata-Metadata.Metadata-Records"); @@ -155,6 +182,9 @@ public void map( Text key, Text value, OutputCollector output, Reporter reporter } if (lr != null) { wetOut.output(lr); + if( cdxWetOut != null ) { + cdxWetOut.output(lr); + } } lr = r; } @@ -163,6 +193,12 @@ public void map( Text key, Text value, OutputCollector output, Reporter reporter } watfsdOut.close(); wetfsdOut.close(); + if ( cdxWatfsdOut != null ) { + cdxWatfsdOut.close(); + } + if ( cdxWetfsdOut != null ) { + cdxWetfsdOut.close(); + } } catch ( Exception e ) { LOG.error( "Error processing file: " + path, e ); reporter.incrCounter("exporter", "errors", 1); @@ -262,6 +298,7 @@ public int run( String[] args ) throws Exception { // keep job running despite some failures in generating WATs job.setBoolean("strictMode",false); job.setBoolean("skipExisting", false); + job.setBoolean("outputCDX", false); job.setOutputFormat(NullOutputFormat.class); job.setOutputKeyClass(Text.class); @@ -277,6 +314,9 @@ public int run( String[] args ) throws Exception { } else if(args[arg].equals("-skipExisting")) { job.setBoolean("skipExisting", true); arg++; + } else if(args[arg].equals("-outputCDX")) { + job.setBoolean("outputCDX", true); + arg++; } else { break; } From 10f91c0858c7ae0d68e52e963c0cb3301de7e60d Mon Sep 17 00:00:00 2001 From: Jason Grey Date: Sun, 4 Aug 2024 18:48:56 +0000 Subject: [PATCH 3/4] Fixing a few java bugs --- .devcontainer/devcontainer.json | 27 +++++++++++++++++++ .../archive/hadoop/jobs/WEATGenerator.java | 14 ++++------ 2 files changed, 32 insertions(+), 9 deletions(-) create mode 100644 .devcontainer/devcontainer.json diff --git a/.devcontainer/devcontainer.json b/.devcontainer/devcontainer.json new file mode 100644 index 0000000..9d10725 --- /dev/null +++ b/.devcontainer/devcontainer.json @@ -0,0 +1,27 @@ +// For format details, see https://aka.ms/devcontainer.json. For config options, see the +// README at: https://github.com/devcontainers/templates/tree/main/src/java +{ + "name": "Java", + // Or use a Dockerfile or Docker Compose file. More info: https://containers.dev/guide/dockerfile + "image": "mcr.microsoft.com/devcontainers/java:1-11-bookworm", + + "features": { + "ghcr.io/devcontainers/features/java:1": { + "version": "none", + "installMaven": "true", + "installGradle": "false" + } + } + + // Use 'forwardPorts' to make a list of ports inside the container available locally. + // "forwardPorts": [], + + // Use 'postCreateCommand' to run commands after the container is created. + // "postCreateCommand": "java -version", + + // Configure tool-specific properties. + // "customizations": {}, + + // Uncomment to connect as root instead. More info: https://aka.ms/dev-containers-non-root. + // "remoteUser": "root" +} diff --git a/src/main/java/org/archive/hadoop/jobs/WEATGenerator.java b/src/main/java/org/archive/hadoop/jobs/WEATGenerator.java index b957d63..382b8e6 100644 --- a/src/main/java/org/archive/hadoop/jobs/WEATGenerator.java +++ b/src/main/java/org/archive/hadoop/jobs/WEATGenerator.java @@ -139,16 +139,12 @@ public void map( Text key, Text value, OutputCollector output, Reporter reporter ExtractorOutput cdxWetOut = null; FSDataOutputStream cdxWatfsOut = null; ExtractorOutput cdxWatOut = null; - if ( this.jobConf.getBoolean("outputCDX") ) { + if ( this.jobConf.getBoolean("outputCDX", false) ) { cdxWetfsOut = FileSystem.get(new java.net.URI(cdxWetOutputFileString), this.jobConf).create(new Path(cdxWetOutputFileString), false); cdxWetOut = new RealCDXExtractorOutput(new PrintWriter(cdxWetfsOut)); cdxWatfsOut = FileSystem.get(new java.net.URI(cdxWatOutputFileString), this.jobConf).create(new Path(cdxWatOutputFileString), false); cdxWatOut = new RealCDXExtractorOutput(new PrintWriter(cdxWatfsOut)); } - FSDataOutputStream cdxWetfsOut = FileSystem.get(new java.net.URI(cdxWetOutputFileString), this.jobConf).create(new Path(cdxWetOutputFileString), false); - ExtractorOutput cdxWetOut = new RealCDXExtractorOutput(new PrintWriter(cdxWetfsOut)); - FSDataOutputStream cdxWatfsOut = FileSystem.get(new java.net.URI(cdxWatOutputFileString), this.jobConf).create(new Path(cdxWatOutputFileString), false); - ExtractorOutput cdxWatOut = new RealCDXExtractorOutput(new PrintWriter(cdxWatfsOut)); int count = 0; Resource lr = null; @@ -193,11 +189,11 @@ public void map( Text key, Text value, OutputCollector output, Reporter reporter } watfsdOut.close(); wetfsdOut.close(); - if ( cdxWatfsdOut != null ) { - cdxWatfsdOut.close(); + if ( cdxWatfsOut != null ) { + cdxWatfsOut.close(); } - if ( cdxWetfsdOut != null ) { - cdxWetfsdOut.close(); + if ( cdxWetfsOut != null ) { + cdxWetfsOut.close(); } } catch ( Exception e ) { LOG.error( "Error processing file: " + path, e ); From e734d5b751725f04e0cdcfc8246588e3a00de441 Mon Sep 17 00:00:00 2001 From: Jason Grey Date: Sun, 4 Aug 2024 19:07:10 +0000 Subject: [PATCH 4/4] fix up paths/naming per Sebastian's comments --- .../archive/hadoop/jobs/WEATGenerator.java | 20 ++++++++++--------- 1 file changed, 11 insertions(+), 9 deletions(-) diff --git a/src/main/java/org/archive/hadoop/jobs/WEATGenerator.java b/src/main/java/org/archive/hadoop/jobs/WEATGenerator.java index 382b8e6..bd0b6f1 100644 --- a/src/main/java/org/archive/hadoop/jobs/WEATGenerator.java +++ b/src/main/java/org/archive/hadoop/jobs/WEATGenerator.java @@ -54,9 +54,6 @@ public class WEATGenerator extends Configured implements Tool { public final static String TOOL_DESCRIPTION = "Generate WAT and WET files from (W)ARC files stored in HDFS"; public static final Log LOG = LogFactory.getLog(WEATGenerator.class); - - // TODO: modify this class to also output cdx files. Look at hadoop plugins for fetch and/or other classes in this project for hints. - // TODO: new property for cdx paths or something is likely needed, same as fetch... public static class WEATGeneratorMapper extends MapReduceBase implements Mapper { private JobConf jobConf; @@ -93,6 +90,7 @@ public void map( Text key, Text value, OutputCollector output, Reporter reporter Path inputPath = new Path(path); Path basePath = inputPath.getParent().getParent(); + Path cdxBasePath = new Path(jobConf.get("cdxBasePath", basePath.toString())); String inputBasename = inputPath.getName(); String watOutputBasename = ""; @@ -103,19 +101,19 @@ public void map( Text key, Text value, OutputCollector output, Reporter reporter if(path.endsWith(".gz")) { watOutputBasename = inputBasename.substring(0,inputBasename.length()-3) + ".wat.gz"; wetOutputBasename = inputBasename.substring(0,inputBasename.length()-3) + ".wet.gz"; - cdxWatOutputBasename = inputBasename.substring(0,inputBasename.length()-3) + ".cdxwat.gz"; - cdxWetOutputBasename = inputBasename.substring(0,inputBasename.length()-3) + ".cdxwet.gz"; + cdxWatOutputBasename = inputBasename.replace(".warc.gz", ".wat.cdx.gz"); + cdxWetOutputBasename = inputBasename.replace(".warc.gz", ".wet.cdx.gz"); } else { watOutputBasename = inputBasename + ".wat.gz"; wetOutputBasename = inputBasename + ".wet.gz"; - cdxWatOutputBasename = inputBasename + ".cdxwat.gz"; - cdxWetOutputBasename = inputBasename + ".cdxwet.gz"; + cdxWatOutputBasename = inputBasename.replace(".warc", ".wat.cdx.gz"); + cdxWetOutputBasename = inputBasename.replace(".warc", ".wet.cdx.gz"); } String watOutputFileString = basePath.toString() + "/wat/" + watOutputBasename; String wetOutputFileString = basePath.toString() + "/wet/" + wetOutputBasename; - String cdxWetOutputFileString = basePath.toString() + "/cdxwet/" + cdxWetOutputBasename; - String cdxWatOutputFileString = basePath.toString() + "/cdxwat/" + cdxWatOutputBasename; + String cdxWetOutputFileString = cdxBasePath.toString() + "/" + cdxWetOutputBasename; + String cdxWatOutputFileString = cdxBasePath.toString() + "/" + cdxWatOutputBasename; LOG.info("About to write out to " + watOutputFileString + " and " + wetOutputFileString); if (this.jobConf.getBoolean("skipExisting", false)) { @@ -313,6 +311,10 @@ public int run( String[] args ) throws Exception { } else if(args[arg].equals("-outputCDX")) { job.setBoolean("outputCDX", true); arg++; + } else if(args[arg].equals("-cdxBasePath")) { + job.set("cdxBasePath", args[arg+1]); + arg++; + arg++; } else { break; }