-
Notifications
You must be signed in to change notification settings - Fork 14
Zip Store #37
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Draft
brokkoli71
wants to merge
42
commits into
main
Choose a base branch
from
zip-store
base: main
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Draft
Zip Store #37
Changes from all commits
Commits
Show all changes
42 commits
Select commit
Hold shift + click to select a range
2d29a3b
add MemoryStore
brokkoli71 218f079
memorystore map string keys
brokkoli71 fa2626b
fix memorystore with List<String> keys
brokkoli71 4763778
Merge branch 'main' into memory-store
brokkoli71 36cb46a
remove code duplication and add v2.Group.setAttributes
brokkoli71 78fa912
add ConcurrentMemoryStore
brokkoli71 bd3bd7c
add tests
brokkoli71 e1f8e49
fix Group.writeMetadata
brokkoli71 50b40f1
add attributes to tests
brokkoli71 dbf7541
Merge branch 'main' into memory-store
brokkoli71 6094489
Array.create and Group.create with no store/path arguments should def…
brokkoli71 250daf1
Add Javadoc comments for Group methods
brokkoli71 261bc10
add ZipStore tests
brokkoli71 a6512de
make MemoryStore allways concurrent
brokkoli71 f650fd6
add ZipStore tests
brokkoli71 dc4dd7d
Merge remote-tracking branch 'origin/zip-store' into zip-store
brokkoli71 12a86b7
refactor and unify outputs of Store.list
brokkoli71 bce731e
read zip store
brokkoli71 135ca6d
end index validation in MemoryStore
brokkoli71 bb5bdf9
Merge branch 'main' into memory-store
brokkoli71 ff9ff99
end index validation in MemoryStore
brokkoli71 2fde2fb
Bump to 0.0.6 to trigger release
joshmoore 013a417
Remove zarr-python setup from deploy workflow
normanrz 6d25740
Bump to 0.0.7
joshmoore d422273
Bump to 0.0.8
joshmoore cf8d562
Create settings.xml
joshmoore f146d8d
Bump to 0.0.9
joshmoore f163b29
write buffer of zip store
brokkoli71 c525a63
use apache commons compress for zip file read and write
brokkoli71 c4448d5
set Zip64Mode.AsNeeded
brokkoli71 025265c
test Zipped OME-Zarr requirements
brokkoli71 7bc8b4a
Sort zarr.json files in breadth-first order within BufferedZipStore
brokkoli71 cb866a2
Merge branch 'main' into zip-store
brokkoli71 cb584e7
manually read zip comment
brokkoli71 071430f
refactor read zip comment
brokkoli71 5196562
test zip store with v2
brokkoli71 6737660
use com.fasterxml.jackson.databind.util.ByteBufferBackedInputStream i…
brokkoli71 2625e13
add ReadOnlyZipStore
brokkoli71 7234f96
fix ReadOnlyZipStore for zips with
brokkoli71 6eb760c
Merge branch 'memory-store' into zip-store
brokkoli71 8b1e1ca
add BufferedZipStore parameter flushOnWrite
brokkoli71 a2a2c67
fix testMemoryStore
brokkoli71 File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
297 changes: 297 additions & 0 deletions
297
src/main/java/dev/zarr/zarrjava/store/BufferedZipStore.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,297 @@ | ||
| package dev.zarr.zarrjava.store; | ||
|
|
||
| import javax.annotation.Nonnull; | ||
| import javax.annotation.Nullable; | ||
| import java.io.ByteArrayOutputStream; | ||
| import java.io.IOException; | ||
| import java.nio.ByteBuffer; | ||
| import java.nio.file.Path; | ||
| import java.nio.file.Paths; | ||
| import java.util.stream.Stream; | ||
|
|
||
| import com.fasterxml.jackson.databind.util.ByteBufferBackedInputStream; | ||
| import org.apache.commons.compress.archivers.zip.*; | ||
|
|
||
| import java.util.zip.CRC32; | ||
| import java.util.zip.ZipEntry; // for STORED constant | ||
|
|
||
| import static dev.zarr.zarrjava.utils.ZipUtils.getZipCommentFromBuffer; | ||
|
|
||
|
|
||
| /** A Store implementation that buffers reads and writes and flushes them to an underlying Store as a zip file. | ||
| */ | ||
| public class BufferedZipStore implements Store, Store.ListableStore { | ||
|
|
||
| private final StoreHandle underlyingStore; | ||
| private final Store.ListableStore bufferStore; | ||
| private String archiveComment; | ||
| private boolean flushOnWrite; | ||
|
|
||
| private void writeBuffer() throws IOException{ | ||
| // create zip file bytes from buffer store and write to underlying store | ||
| ByteArrayOutputStream baos = new ByteArrayOutputStream(); | ||
| try (ZipArchiveOutputStream zos = new ZipArchiveOutputStream(baos)) { | ||
| zos.setUseZip64(Zip64Mode.AsNeeded); | ||
| if (archiveComment != null) { | ||
| zos.setComment(archiveComment); | ||
| } | ||
| Stream<String[]> entries = bufferStore.list().sorted( | ||
| (a, b) -> { | ||
| boolean aIsZarr = a.length > 0 && a[a.length - 1].equals("zarr.json"); | ||
| boolean bIsZarr = b.length > 0 && b[b.length - 1].equals("zarr.json"); | ||
| // first all zarr.json files | ||
| if (aIsZarr && !bIsZarr) { | ||
| return -1; | ||
| } else if (!aIsZarr && bIsZarr) { | ||
| return 1; | ||
| } else if (aIsZarr && bIsZarr) { | ||
| // sort zarr.json in BFS order within same depth by lexicographical order | ||
| if (a.length != b.length) { | ||
| return Integer.compare(a.length, b.length); | ||
| } else { | ||
| return String.join("/", a).compareTo(String.join("/", b)); | ||
| } | ||
| } else { | ||
| // then all other files in lexicographical order | ||
| return String.join("/", a).compareTo(String.join("/", b)); | ||
| } | ||
| } | ||
| ); | ||
|
|
||
| entries.forEach(keys -> { | ||
| try { | ||
| if (keys == null || keys.length == 0) { | ||
| // skip root entry | ||
| return; | ||
| } | ||
| String entryName = String.join("/", keys); | ||
| ByteBuffer bb = bufferStore.get(keys); | ||
| if (bb == null) { | ||
| // directory entry: ensure trailing slash | ||
| if (!entryName.endsWith("/")) { | ||
| entryName = entryName + "/"; | ||
| } | ||
| ZipArchiveEntry dirEntry = new ZipArchiveEntry(entryName); | ||
| dirEntry.setMethod(ZipEntry.STORED); | ||
| dirEntry.setSize(0); | ||
| dirEntry.setCrc(0); | ||
| zos.putArchiveEntry(dirEntry); | ||
| zos.closeArchiveEntry(); | ||
| } else { | ||
| // read bytes from ByteBuffer without modifying original | ||
| ByteBuffer dup = bb.duplicate(); | ||
| int len = dup.remaining(); | ||
| byte[] bytes = new byte[len]; | ||
| dup.get(bytes); | ||
|
|
||
| // compute CRC and set size for STORED (no compression) | ||
| CRC32 crc = new CRC32(); | ||
| crc.update(bytes, 0, bytes.length); | ||
| ZipArchiveEntry fileEntry = new ZipArchiveEntry(entryName); | ||
| fileEntry.setMethod(ZipEntry.STORED); | ||
| fileEntry.setSize(bytes.length); | ||
| fileEntry.setCrc(crc.getValue()); | ||
|
|
||
| zos.putArchiveEntry(fileEntry); | ||
| zos.write(bytes); | ||
| zos.closeArchiveEntry(); | ||
| } | ||
| } catch (IOException e) { | ||
| // wrap checked exception so it can be rethrown from stream for handling below | ||
| throw new RuntimeException(e); | ||
| } | ||
| }); | ||
| zos.finish(); | ||
| } catch (RuntimeException e) { | ||
| // unwrap and rethrow IOExceptions thrown inside the lambda | ||
| if (e.getCause() instanceof IOException) { | ||
| throw (IOException) e.getCause(); | ||
| } | ||
| throw e; | ||
| } | ||
|
|
||
| byte[] zipBytes = baos.toByteArray(); | ||
| // write zip bytes back to underlying store | ||
| underlyingStore.set(ByteBuffer.wrap(zipBytes)); | ||
| } | ||
|
|
||
|
|
||
| private void loadBuffer() throws IOException{ | ||
| // read zip file bytes from underlying store and populate buffer store | ||
| ByteBuffer buffer = underlyingStore.read(); | ||
| if (buffer == null) { | ||
| return; | ||
| } | ||
| byte[] bufArray; | ||
| if (buffer.hasArray()) { | ||
| bufArray = buffer.array(); | ||
| } else { | ||
| bufArray = new byte[buffer.remaining()]; | ||
| buffer.duplicate().get(bufArray); | ||
| } | ||
| this.archiveComment = getZipCommentFromBuffer(bufArray); | ||
| try (ZipArchiveInputStream zis = new ZipArchiveInputStream(new ByteBufferBackedInputStream(buffer))) { | ||
| ZipArchiveEntry entry; | ||
| while ((entry = zis.getNextEntry()) != null) { | ||
| if (entry.isDirectory()) { | ||
| continue; | ||
| } | ||
| ByteArrayOutputStream baos = new ByteArrayOutputStream(); | ||
| byte[] tmp = new byte[8192]; | ||
| int read; | ||
| while ((read = zis.read(tmp)) != -1) { | ||
| baos.write(tmp, 0, read); | ||
| } | ||
| byte[] bytes = baos.toByteArray(); | ||
| bufferStore.set(new String[]{entry.getName()}, ByteBuffer.wrap(bytes)); | ||
| } | ||
| } | ||
| } | ||
|
|
||
| public BufferedZipStore(@Nonnull StoreHandle underlyingStore, @Nonnull Store.ListableStore bufferStore, @Nullable String archiveComment, boolean flushOnWrite) { | ||
| this.underlyingStore = underlyingStore; | ||
| this.bufferStore = bufferStore; | ||
| this.archiveComment = archiveComment; | ||
| this.flushOnWrite = flushOnWrite; | ||
| try { | ||
| loadBuffer(); | ||
| } catch (IOException e) { | ||
| throw new RuntimeException("Failed to load buffer from underlying store", e); | ||
| } | ||
| } | ||
|
|
||
| public BufferedZipStore(@Nonnull StoreHandle underlyingStore, @Nonnull Store.ListableStore bufferStore, @Nullable String archiveComment) { | ||
| this(underlyingStore, bufferStore, archiveComment, true); | ||
| } | ||
|
|
||
| public BufferedZipStore(@Nonnull StoreHandle underlyingStore, @Nonnull Store.ListableStore bufferStore) { | ||
| this(underlyingStore, bufferStore, null); | ||
| } | ||
|
|
||
| public BufferedZipStore(@Nonnull StoreHandle underlyingStore, String archiveComment) { | ||
| this(underlyingStore, new MemoryStore(), archiveComment); | ||
| } | ||
|
|
||
| public BufferedZipStore(@Nonnull StoreHandle underlyingStore) { | ||
| this(underlyingStore, (String) null); | ||
| } | ||
|
|
||
| public BufferedZipStore(@Nonnull Path underlyingStore, String archiveComment) { | ||
| this(new FilesystemStore(underlyingStore.getParent()).resolve(underlyingStore.getFileName().toString()), archiveComment); | ||
| } | ||
|
|
||
| public BufferedZipStore(@Nonnull Path underlyingStore) { | ||
| this(underlyingStore, null); | ||
| } | ||
|
|
||
| public BufferedZipStore(@Nonnull String underlyingStorePath, String archiveComment) { | ||
| this(Paths.get(underlyingStorePath), archiveComment); | ||
| } | ||
|
|
||
| public BufferedZipStore(@Nonnull String underlyingStorePath) { | ||
| this(underlyingStorePath, null); | ||
| } | ||
|
|
||
| public BufferedZipStore(@Nonnull StoreHandle underlyingStore, @Nonnull Store.ListableStore bufferStore, boolean flushOnWrite) { | ||
| this(underlyingStore, bufferStore, null, flushOnWrite); | ||
| } | ||
|
|
||
| public BufferedZipStore(@Nonnull StoreHandle underlyingStore, String archiveComment, boolean flushOnWrite) { | ||
| this(underlyingStore, new MemoryStore(), archiveComment, flushOnWrite); | ||
| } | ||
|
|
||
| public BufferedZipStore(@Nonnull StoreHandle underlyingStore, boolean flushOnWrite) { | ||
| this(underlyingStore, (String) null, flushOnWrite); | ||
| } | ||
|
|
||
| public BufferedZipStore(@Nonnull Path underlyingStore, String archiveComment, boolean flushOnWrite) { | ||
| this(new FilesystemStore(underlyingStore.getParent()).resolve(underlyingStore.getFileName().toString()), archiveComment, flushOnWrite); | ||
| } | ||
|
|
||
| public BufferedZipStore(@Nonnull Path underlyingStore, boolean flushOnWrite) { | ||
| this(underlyingStore, null, flushOnWrite); | ||
| } | ||
|
|
||
| public BufferedZipStore(@Nonnull String underlyingStorePath, String archiveComment, boolean flushOnWrite) { | ||
| this(Paths.get(underlyingStorePath), archiveComment, flushOnWrite); | ||
| } | ||
|
|
||
| public BufferedZipStore(@Nonnull String underlyingStorePath, boolean flushOnWrite) { | ||
| this(underlyingStorePath, null, flushOnWrite); | ||
| } | ||
|
|
||
|
|
||
| /** | ||
| * Flushes the buffer and archiveComment to the underlying store as a zip file. | ||
| */ | ||
| public void flush() throws IOException { | ||
| writeBuffer(); | ||
| } | ||
|
|
||
| public String getArchiveComment() { | ||
| return archiveComment; | ||
| } | ||
|
|
||
| @Override | ||
| public Stream<String[]> list(String[] keys) { | ||
| return bufferStore.list(keys); | ||
| } | ||
|
|
||
| @Override | ||
| public boolean exists(String[] keys) { | ||
| return bufferStore.exists(keys); | ||
| } | ||
|
|
||
| @Nullable | ||
| @Override | ||
| public ByteBuffer get(String[] keys) { | ||
| return bufferStore.get(keys); | ||
| } | ||
|
|
||
| @Nullable | ||
| @Override | ||
| public ByteBuffer get(String[] keys, long start) { | ||
| return bufferStore.get(keys, start); | ||
| } | ||
|
|
||
| @Nullable | ||
| @Override | ||
| public ByteBuffer get(String[] keys, long start, long end) { | ||
| return bufferStore.get(keys, start, end); | ||
| } | ||
|
|
||
| @Override | ||
| public void set(String[] keys, ByteBuffer bytes) { | ||
| bufferStore.set(keys, bytes); | ||
| if (flushOnWrite) { | ||
| try { | ||
| writeBuffer(); | ||
| } catch (IOException e) { | ||
| throw new RuntimeException("Failed to flush buffer to underlying store after set operation", e); | ||
| } | ||
| } | ||
| } | ||
|
|
||
| @Override | ||
| public void delete(String[] keys) { | ||
| bufferStore.delete(keys); | ||
| if (flushOnWrite) { | ||
| try { | ||
| writeBuffer(); | ||
| } catch (IOException e) { | ||
| throw new RuntimeException("Failed to flush buffer to underlying store after delete operation", e); | ||
| } | ||
| } | ||
| } | ||
|
|
||
| @Nonnull | ||
| @Override | ||
| public StoreHandle resolve(String... keys) { | ||
| return new StoreHandle(this, keys); | ||
| } | ||
|
|
||
| @Override | ||
| public String toString() { | ||
| return "BufferedZipStore(" + underlyingStore.toString() + ")"; | ||
| } | ||
| } | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is fine for now, but it creates copies of the entire zip archive in memory, so it could be expensive. Maybe we could add a streaming-write option the the store interface in the future.