diff --git a/blob/pom.xml b/blob/pom.xml
index 6a1161cb19..3a6c11bb24 100644
--- a/blob/pom.xml
+++ b/blob/pom.xml
@@ -63,6 +63,10 @@
com.fasterxml.jackson.core
jackson-databind
+
+ com.amazonaws
+ aws-java-sdk-s3
+
diff --git a/blob/src/main/java/com/bazaarvoice/emodb/blob/BlobStoreModule.java b/blob/src/main/java/com/bazaarvoice/emodb/blob/BlobStoreModule.java
index cc77c64411..037234748e 100644
--- a/blob/src/main/java/com/bazaarvoice/emodb/blob/BlobStoreModule.java
+++ b/blob/src/main/java/com/bazaarvoice/emodb/blob/BlobStoreModule.java
@@ -4,6 +4,7 @@
import com.bazaarvoice.emodb.blob.core.BlobStoreProviderProxy;
import com.bazaarvoice.emodb.blob.core.DefaultBlobStore;
import com.bazaarvoice.emodb.blob.core.LocalBlobStore;
+import com.bazaarvoice.emodb.blob.core.S3BlobStore;
import com.bazaarvoice.emodb.blob.core.SystemBlobStore;
import com.bazaarvoice.emodb.blob.db.StorageProvider;
import com.bazaarvoice.emodb.blob.db.astyanax.AstyanaxStorageProvider;
@@ -197,8 +198,8 @@ protected void configure() {
}
// Bind the BlobStore instance that the rest of the application will consume
- bind(DefaultBlobStore.class).asEagerSingleton();
- bind(BlobStore.class).annotatedWith(LocalBlobStore.class).to(DefaultBlobStore.class);
+ bind(S3BlobStore.class).asEagerSingleton();
+ bind(BlobStore.class).annotatedWith(LocalBlobStore.class).to(S3BlobStore.class);
expose(BlobStore.class);
// Bind any methods annotated with @ParameterizedTimed
diff --git a/blob/src/main/java/com/bazaarvoice/emodb/blob/core/S3BlobStore.java b/blob/src/main/java/com/bazaarvoice/emodb/blob/core/S3BlobStore.java
new file mode 100644
index 0000000000..ae27baaab0
--- /dev/null
+++ b/blob/src/main/java/com/bazaarvoice/emodb/blob/core/S3BlobStore.java
@@ -0,0 +1,370 @@
+package com.bazaarvoice.emodb.blob.core;
+
+import com.amazonaws.regions.Regions;
+import com.amazonaws.services.s3.AmazonS3;
+import com.amazonaws.services.s3.AmazonS3ClientBuilder;
+import com.amazonaws.services.s3.model.AmazonS3Exception;
+import com.amazonaws.services.s3.model.GetObjectRequest;
+import com.amazonaws.services.s3.model.ListObjectsV2Request;
+import com.amazonaws.services.s3.model.ListObjectsV2Result;
+import com.amazonaws.services.s3.model.ObjectMetadata;
+import com.amazonaws.services.s3.model.S3Object;
+import com.amazonaws.services.s3.model.S3ObjectInputStream;
+import com.amazonaws.services.s3.model.S3ObjectSummary;
+import com.amazonaws.util.IOUtils;
+import com.bazaarvoice.emodb.blob.api.Blob;
+import com.bazaarvoice.emodb.blob.api.BlobMetadata;
+import com.bazaarvoice.emodb.blob.api.BlobNotFoundException;
+import com.bazaarvoice.emodb.blob.api.BlobStore;
+import com.bazaarvoice.emodb.blob.api.DefaultBlob;
+import com.bazaarvoice.emodb.blob.api.DefaultBlobMetadata;
+import com.bazaarvoice.emodb.blob.api.DefaultTable;
+import com.bazaarvoice.emodb.blob.api.Names;
+import com.bazaarvoice.emodb.blob.api.Range;
+import com.bazaarvoice.emodb.blob.api.RangeNotSatisfiableException;
+import com.bazaarvoice.emodb.blob.api.RangeSpecification;
+import com.bazaarvoice.emodb.blob.api.StreamSupplier;
+import com.bazaarvoice.emodb.blob.api.Table;
+import com.bazaarvoice.emodb.common.api.impl.LimitCounter;
+import com.bazaarvoice.emodb.sor.api.Audit;
+import com.bazaarvoice.emodb.sor.api.TableExistsException;
+import com.bazaarvoice.emodb.sor.api.TableOptions;
+import com.bazaarvoice.emodb.sor.api.UnknownTableException;
+import com.bazaarvoice.emodb.table.db.TableDAO;
+import com.google.common.base.Objects;
+import com.google.common.collect.AbstractIterator;
+import com.google.common.collect.Maps;
+import com.google.inject.Inject;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.security.DigestInputStream;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+import java.time.Duration;
+import java.util.Collection;
+import java.util.Date;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Supplier;
+import javax.annotation.Nullable;
+import javax.ws.rs.core.Response;
+import javax.ws.rs.core.UriBuilder;
+import jersey.repackaged.com.google.common.base.Throwables;
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+public class S3BlobStore implements BlobStore {
+
+ private final TableDAO _tableDao;
+ private final AmazonS3 _s3 = AmazonS3ClientBuilder.standard().withRegion(Regions.US_EAST_1).build();
+
+ @Inject
+ public S3BlobStore(TableDAO tableDAO) {
+ _tableDao = tableDAO;
+ }
+
+ @Override
+ public Iterator listTables(@Nullable String fromTableExclusive, long limit) {
+ checkArgument(limit > 0, "Limit must be >0");
+
+ LimitCounter remaining = new LimitCounter(limit);
+ final Iterator tableIter = _tableDao.list(fromTableExclusive, remaining);
+ return remaining.limit(new AbstractIterator() {
+ @Override
+ protected com.bazaarvoice.emodb.blob.api.Table computeNext() {
+ while (tableIter.hasNext()) {
+ com.bazaarvoice.emodb.table.db.Table table = tableIter.next();
+ if (!table.isInternal()) {
+ return toDefaultTable(table);
+ }
+ }
+ return endOfData();
+ }
+ });
+ }
+
+ private com.bazaarvoice.emodb.blob.api.Table toDefaultTable(com.bazaarvoice.emodb.table.db.Table table) {
+ //noinspection unchecked
+ Map attributes = (Map) table.getAttributes();
+ return new DefaultTable(table.getName(), table.getOptions(), attributes, table.getAvailability());
+ }
+
+ @Override
+ public void createTable(String table, TableOptions options, Map attributes, Audit audit) throws TableExistsException {
+ checkLegalTableName(table);
+ checkNotNull(options, "options");
+ checkNotNull(attributes, "attributes");
+ checkMapOfStrings(attributes, "attributes"); // Defensive check that generic type restrictions aren't bypassed
+ checkNotNull(audit, "audit");
+ _tableDao.create(table, options, attributes, audit);
+ }
+
+ @Override
+ public void dropTable(String table, Audit audit) throws UnknownTableException {
+ checkLegalTableName(table);
+ checkNotNull(audit, "audit");
+ _tableDao.drop(table, audit);
+ }
+
+ @Override
+ public void purgeTableUnsafe(String table, Audit audit) throws UnknownTableException {
+
+ }
+
+ @Override
+ public boolean getTableExists(String table) {
+ checkLegalTableName(table);
+ return _tableDao.exists(table);
+ }
+
+ @Override
+ public boolean isTableAvailable(String table) {
+ checkLegalTableName(table);
+ return _tableDao.get(table).getAvailability() != null;
+ }
+
+ @Override
+ public com.bazaarvoice.emodb.blob.api.Table getTableMetadata(String table) {
+ checkLegalTableName(table);
+ return toDefaultTable(_tableDao.get(table));
+ }
+
+ @Override
+ public Map getTableAttributes(String table) throws UnknownTableException {
+ checkLegalTableName(table);
+ return getAttributes(_tableDao.get(table));
+ }
+
+ @Override
+ public void setTableAttributes(String table, Map attributes, Audit audit) throws UnknownTableException {
+ checkLegalTableName(table);
+ checkNotNull(attributes, "attributes");
+ checkMapOfStrings(attributes, "attributes"); // Defensive check that generic type restrictions aren't bypassed
+ checkNotNull(audit, "audit");
+ _tableDao.setAttributes(table, attributes, audit);
+ }
+
+ @Override
+ public TableOptions getTableOptions(String table) throws UnknownTableException {
+ checkLegalTableName(table);
+ return _tableDao.get(table).getOptions();
+ }
+
+ @Override
+ public long getTableApproximateSize(String table) throws UnknownTableException {
+ return 0;
+ }
+
+ @Override
+ public BlobMetadata getMetadata(String tableName, String blobId) throws BlobNotFoundException {
+ checkLegalTableName(tableName);
+ checkLegalBlobId(blobId);
+ com.bazaarvoice.emodb.table.db.Table table = _tableDao.get(tableName);
+
+ String path = UriBuilder.fromPath("{arg1}").path("{arg2}").build(table, blobId).toASCIIString();
+
+ ObjectMetadata metadata;
+
+ try {
+ metadata = _s3.getObjectMetadata("bv-emodb-local-audit", path);
+ } catch (AmazonS3Exception e) {
+ if (e.getStatusCode() == Response.Status.NOT_FOUND.getStatusCode()) {
+ metadata = null;
+ } else {
+ throw Throwables.propagate(e);
+ }
+ }
+
+ return newMetadata(table, blobId, metadata);
+ }
+
+ @Override
+ public Iterator scanMetadata(String tableName, @Nullable String fromBlobIdExclusive, long limit) {
+ checkLegalTableName(tableName);
+ com.bazaarvoice.emodb.table.db.Table table = _tableDao.get(tableName);
+
+ Iterator summaryIterator = new AbstractIterator() {
+ int index;
+ List localSummaries = null;
+ int remaining = (int) limit;
+ String continuationToken = null;
+
+ @Override
+ protected S3ObjectSummary computeNext() {
+
+ if (remaining == 0) {
+ return endOfData();
+ }
+
+ if (localSummaries == null || index == localSummaries.size()) {
+
+ ListObjectsV2Request request = new ListObjectsV2Request()
+ .withBucketName("bv-emodb-local-audit")
+ .withPrefix(tableName)
+ .withStartAfter(fromBlobIdExclusive)
+ .withMaxKeys(Math.min(remaining, 1000));
+ if (continuationToken != null) {
+ request.setContinuationToken(continuationToken);
+ }
+
+ ListObjectsV2Result result = _s3.listObjectsV2(request);
+ localSummaries = result.getObjectSummaries();
+ continuationToken = result.getNextContinuationToken();
+ if (continuationToken == null) {
+ remaining = localSummaries.size();
+ }
+
+ index = 0;
+ }
+
+ remaining--;
+ return index < localSummaries.size() ? localSummaries.get(index++) : endOfData();
+ }
+ };
+
+ return new MetadataIterator(summaryIterator, table);
+ }
+
+ private class MetadataIterator extends AbstractIterator {
+ private final Iterator _summaryIterator;
+ private final com.bazaarvoice.emodb.table.db.Table _table;
+
+ public MetadataIterator(Iterator summaryIterator, com.bazaarvoice.emodb.table.db.Table table) {
+ _summaryIterator = checkNotNull(summaryIterator);
+ _table = checkNotNull(table);
+ }
+
+ @Override
+ protected BlobMetadata computeNext() {
+ if (_summaryIterator.hasNext()) {
+ S3ObjectSummary summary = _summaryIterator.next();
+ return newMetadata(_table, summary.getKey().substring(summary.getKey().lastIndexOf('/') + 1), _s3.getObjectMetadata(summary.getBucketName(), summary.getKey()));
+ }
+ return endOfData();
+ }
+ }
+
+ @Override
+ public Blob get(String table, String blobId) throws BlobNotFoundException {
+ return get(table, blobId, null);
+ }
+
+ @Override
+ public Blob get(String tableName, String blobId, @Nullable RangeSpecification rangeSpec) throws BlobNotFoundException, RangeNotSatisfiableException {
+ checkLegalTableName(tableName);
+ checkLegalBlobId(blobId);
+
+ final com.bazaarvoice.emodb.table.db.Table table = _tableDao.get(tableName);
+ String path = UriBuilder.fromPath("{arg1}").path("{arg2}").build(table, blobId).toASCIIString();
+
+ BlobMetadata metadata = getMetadata(tableName, blobId);
+
+ final Range range;
+ if (rangeSpec != null) {
+ range = rangeSpec.getRange(metadata.getLength());
+ // Satisfiable range requests must return at least one byte (per HTTP spec).
+ checkArgument(range.getOffset() >= 0 && range.getLength() > 0 &&
+ range.getOffset() + range.getLength() <= metadata.getLength(), "Invalid byte range: %s", rangeSpec);
+ } else {
+ // If no range is specified, return the entire entity. This may return zero bytes.
+ range = new Range(0, metadata.getLength());
+ }
+
+ GetObjectRequest rangeObjectRequest = new GetObjectRequest("bv-emodb-local-audit", path)
+ .withRange(range.getOffset(), range.getOffset() + range.getLength());
+
+ S3Object object = _s3.getObject(rangeObjectRequest);
+ S3ObjectInputStream objectInputStream = object.getObjectContent();
+
+ return new DefaultBlob(metadata, range, new StreamSupplier() {
+ @Override
+ public void writeTo(OutputStream out) throws IOException {
+ IOUtils.copy(objectInputStream, out);
+ }
+ });
+ }
+
+ private BlobMetadata newMetadata(com.bazaarvoice.emodb.table.db.Table table, String blobId, ObjectMetadata s) {
+ if (s == null) {
+ throw new BlobNotFoundException(blobId);
+ }
+ Map attributes = Maps.newTreeMap();
+ attributes.putAll(s.getUserMetadata());
+ attributes.putAll(getAttributes(table));
+ Date timestamp = s.getLastModified(); // Convert from microseconds
+ return new DefaultBlobMetadata(blobId, timestamp, s.getContentLength(), s.getContentMD5(), s.getETag(), attributes);
+ }
+
+ @Override
+ public void put(String tableName, String blobId, Supplier extends InputStream> in, Map attributes, @Nullable Duration ttl) throws IOException {
+ checkLegalTableName(tableName);
+ checkLegalBlobId(blobId);
+ checkNotNull(in, "in");
+ checkNotNull(attributes, "attributes");
+
+ com.bazaarvoice.emodb.table.db.Table table = _tableDao.get(tableName);
+
+ long timestamp = System.currentTimeMillis();
+ String path = UriBuilder.fromPath("{arg1}").path("{arg2}").build(table, blobId).toASCIIString();
+
+ // TODO: using the s3 api that we are currently using, we are unable to set teh md5 hash on the objectMetadata like we need to
+// DigestInputStream md5In = new DigestInputStream(in.get(), getMessageDigest("MD5"));
+// DigestInputStream sha1In = new DigestInputStream(md5In, getMessageDigest("SHA-1"));
+
+ ObjectMetadata objectMetadata = new ObjectMetadata();
+ objectMetadata.setUserMetadata(attributes);
+
+ _s3.putObject("bv-emodb-local-audit", path, in.get(), objectMetadata);
+ }
+
+ @Override
+ public void delete(String tableName, String blobId) {
+ checkLegalTableName(tableName);
+ checkLegalBlobId(blobId);
+
+ com.bazaarvoice.emodb.table.db.Table table = _tableDao.get(tableName);
+ String path = UriBuilder.fromPath("{arg1}").path("{arg2}").build(table, blobId).toASCIIString();
+
+ _s3.deleteObject("bv-emodb-local-audit", path);
+ }
+
+ @Override
+ public Collection getTablePlacements() {
+ return null;
+ }
+
+ private Map getAttributes(com.bazaarvoice.emodb.table.db.Table table) {
+ // Coerce Map to Map
+ return (Map) table.getAttributes();
+ }
+
+ private void checkMapOfStrings(Map, ?> map, String message) {
+ for (Map.Entry, ?> entry : map.entrySet()) {
+ checkArgument(entry.getKey() instanceof String, message);
+ checkArgument(entry.getValue() instanceof String, message);
+ }
+ }
+
+ private static MessageDigest getMessageDigest(String algorithmName) {
+ try {
+ return MessageDigest.getInstance(algorithmName);
+ } catch (NoSuchAlgorithmException e) {
+ throw new IllegalStateException(e);
+ }
+ }
+
+ private void checkLegalTableName(String table) {
+ checkArgument(Names.isLegalTableName(table),
+ "Table name must be a lowercase ASCII string between 1 and 255 characters in length. " +
+ "Allowed punctuation characters are -.:@_ and the table name may not start with a single underscore character. " +
+ "An example of a valid table name would be 'photo:testcustomer'.");
+ }
+
+ private void checkLegalBlobId(String blobId) {
+ checkArgument(Names.isLegalBlobId(blobId),
+ "Blob IDs must be ASCII strings between 1 and 255 characters in length. " +
+ "Whitespace, ISO control characters and certain punctuation characters that aren't generally allowed in file names are excluded.");
+ }
+}
diff --git a/web/src/main/java/com/bazaarvoice/emodb/web/resources/blob/BlobStoreResource1.java b/web/src/main/java/com/bazaarvoice/emodb/web/resources/blob/BlobStoreResource1.java
index cbb25ebb36..b868478409 100644
--- a/web/src/main/java/com/bazaarvoice/emodb/web/resources/blob/BlobStoreResource1.java
+++ b/web/src/main/java/com/bazaarvoice/emodb/web/resources/blob/BlobStoreResource1.java
@@ -326,7 +326,7 @@ private void setHeaders(Response.ResponseBuilder response, BlobMetadata metadata
response.lastModified(metadata.getTimestamp());
// Put the MD5 in Content-MD5 (http spec says must be base64), SHA1 hash in ETAG (use hex)
- response.header(com.google.common.net.HttpHeaders.CONTENT_MD5, hexToBase64(metadata.getMD5()));
+ response.header(com.google.common.net.HttpHeaders.CONTENT_MD5, metadata.getMD5());
response.header(HttpHeaders.ETAG, '"' + metadata.getSHA1() + '"');
// Default to a binary "Content-Type" header.