diff --git a/LICENSE b/LICENSE deleted file mode 100644 index 57bc88a..0000000 --- a/LICENSE +++ /dev/null @@ -1,202 +0,0 @@ - Apache License - Version 2.0, January 2004 - http://www.apache.org/licenses/ - - TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION - - 1. Definitions. - - "License" shall mean the terms and conditions for use, reproduction, - and distribution as defined by Sections 1 through 9 of this document. - - "Licensor" shall mean the copyright owner or entity authorized by - the copyright owner that is granting the License. - - "Legal Entity" shall mean the union of the acting entity and all - other entities that control, are controlled by, or are under common - control with that entity. For the purposes of this definition, - "control" means (i) the power, direct or indirect, to cause the - direction or management of such entity, whether by contract or - otherwise, or (ii) ownership of fifty percent (50%) or more of the - outstanding shares, or (iii) beneficial ownership of such entity. - - "You" (or "Your") shall mean an individual or Legal Entity - exercising permissions granted by this License. - - "Source" form shall mean the preferred form for making modifications, - including but not limited to software source code, documentation - source, and configuration files. - - "Object" form shall mean any form resulting from mechanical - transformation or translation of a Source form, including but - not limited to compiled object code, generated documentation, - and conversions to other media types. - - "Work" shall mean the work of authorship, whether in Source or - Object form, made available under the License, as indicated by a - copyright notice that is included in or attached to the work - (an example is provided in the Appendix below). - - "Derivative Works" shall mean any work, whether in Source or Object - form, that is based on (or derived from) the Work and for which the - editorial revisions, annotations, elaborations, or other modifications - represent, as a whole, an original work of authorship. For the purposes - of this License, Derivative Works shall not include works that remain - separable from, or merely link (or bind by name) to the interfaces of, - the Work and Derivative Works thereof. - - "Contribution" shall mean any work of authorship, including - the original version of the Work and any modifications or additions - to that Work or Derivative Works thereof, that is intentionally - submitted to Licensor for inclusion in the Work by the copyright owner - or by an individual or Legal Entity authorized to submit on behalf of - the copyright owner. For the purposes of this definition, "submitted" - means any form of electronic, verbal, or written communication sent - to the Licensor or its representatives, including but not limited to - communication on electronic mailing lists, source code control systems, - and issue tracking systems that are managed by, or on behalf of, the - Licensor for the purpose of discussing and improving the Work, but - excluding communication that is conspicuously marked or otherwise - designated in writing by the copyright owner as "Not a Contribution." - - "Contributor" shall mean Licensor and any individual or Legal Entity - on behalf of whom a Contribution has been received by Licensor and - subsequently incorporated within the Work. - - 2. Grant of Copyright License. Subject to the terms and conditions of - this License, each Contributor hereby grants to You a perpetual, - worldwide, non-exclusive, no-charge, royalty-free, irrevocable - copyright license to reproduce, prepare Derivative Works of, - publicly display, publicly perform, sublicense, and distribute the - Work and such Derivative Works in Source or Object form. - - 3. Grant of Patent License. Subject to the terms and conditions of - this License, each Contributor hereby grants to You a perpetual, - worldwide, non-exclusive, no-charge, royalty-free, irrevocable - (except as stated in this section) patent license to make, have made, - use, offer to sell, sell, import, and otherwise transfer the Work, - where such license applies only to those patent claims licensable - by such Contributor that are necessarily infringed by their - Contribution(s) alone or by combination of their Contribution(s) - with the Work to which such Contribution(s) was submitted. If You - institute patent litigation against any entity (including a - cross-claim or counterclaim in a lawsuit) alleging that the Work - or a Contribution incorporated within the Work constitutes direct - or contributory patent infringement, then any patent licenses - granted to You under this License for that Work shall terminate - as of the date such litigation is filed. - - 4. Redistribution. You may reproduce and distribute copies of the - Work or Derivative Works thereof in any medium, with or without - modifications, and in Source or Object form, provided that You - meet the following conditions: - - (a) You must give any other recipients of the Work or - Derivative Works a copy of this License; and - - (b) You must cause any modified files to carry prominent notices - stating that You changed the files; and - - (c) You must retain, in the Source form of any Derivative Works - that You distribute, all copyright, patent, trademark, and - attribution notices from the Source form of the Work, - excluding those notices that do not pertain to any part of - the Derivative Works; and - - (d) If the Work includes a "NOTICE" text file as part of its - distribution, then any Derivative Works that You distribute must - include a readable copy of the attribution notices contained - within such NOTICE file, excluding those notices that do not - pertain to any part of the Derivative Works, in at least one - of the following places: within a NOTICE text file distributed - as part of the Derivative Works; within the Source form or - documentation, if provided along with the Derivative Works; or, - within a display generated by the Derivative Works, if and - wherever such third-party notices normally appear. The contents - of the NOTICE file are for informational purposes only and - do not modify the License. You may add Your own attribution - notices within Derivative Works that You distribute, alongside - or as an addendum to the NOTICE text from the Work, provided - that such additional attribution notices cannot be construed - as modifying the License. - - You may add Your own copyright statement to Your modifications and - may provide additional or different license terms and conditions - for use, reproduction, or distribution of Your modifications, or - for any such Derivative Works as a whole, provided Your use, - reproduction, and distribution of the Work otherwise complies with - the conditions stated in this License. - - 5. Submission of Contributions. Unless You explicitly state otherwise, - any Contribution intentionally submitted for inclusion in the Work - by You to the Licensor shall be under the terms and conditions of - this License, without any additional terms or conditions. - Notwithstanding the above, nothing herein shall supersede or modify - the terms of any separate license agreement you may have executed - with Licensor regarding such Contributions. - - 6. Trademarks. This License does not grant permission to use the trade - names, trademarks, service marks, or product names of the Licensor, - except as required for reasonable and customary use in describing the - origin of the Work and reproducing the content of the NOTICE file. - - 7. Disclaimer of Warranty. Unless required by applicable law or - agreed to in writing, Licensor provides the Work (and each - Contributor provides its Contributions) on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or - implied, including, without limitation, any warranties or conditions - of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A - PARTICULAR PURPOSE. You are solely responsible for determining the - appropriateness of using or redistributing the Work and assume any - risks associated with Your exercise of permissions under this License. - - 8. Limitation of Liability. In no event and under no legal theory, - whether in tort (including negligence), contract, or otherwise, - unless required by applicable law (such as deliberate and grossly - negligent acts) or agreed to in writing, shall any Contributor be - liable to You for damages, including any direct, indirect, special, - incidental, or consequential damages of any character arising as a - result of this License or out of the use or inability to use the - Work (including but not limited to damages for loss of goodwill, - work stoppage, computer failure or malfunction, or any and all - other commercial damages or losses), even if such Contributor - has been advised of the possibility of such damages. - - 9. Accepting Warranty or Additional Liability. While redistributing - the Work or Derivative Works thereof, You may choose to offer, - and charge a fee for, acceptance of support, warranty, indemnity, - or other liability obligations and/or rights consistent with this - License. However, in accepting such obligations, You may act only - on Your own behalf and on Your sole responsibility, not on behalf - of any other Contributor, and only if You agree to indemnify, - defend, and hold each Contributor harmless for any liability - incurred by, or claims asserted against, such Contributor by reason - of your accepting any such warranty or additional liability. - - END OF TERMS AND CONDITIONS - - APPENDIX: How to apply the Apache License to your work. - - To apply the Apache License to your work, attach the following - boilerplate notice, with the fields enclosed by brackets "[]" - replaced with your own identifying information. (Don't include - the brackets!) The text should be enclosed in the appropriate - comment syntax for the file format. We also recommend that a - file or class name and description of purpose be included on the - same "printed page" as the copyright notice for easier - identification within third-party archives. - - Copyright [yyyy] [name of copyright owner] - - Licensed under the Apache License, Version 2.0 (the "License"); - you may not use this file except in compliance with the License. - You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. - diff --git a/README.md b/README.md index 60f40d2..94e9ec8 100644 --- a/README.md +++ b/README.md @@ -1,156 +1,7 @@ -# data Artisans Streaming Ledger +# Ververica Stream Ledger -### Serializable ACID transactions on streaming data - -data Artisans Streaming Ledger is a library on top of [Apache Flink](https://flink.apache.org/), for processing event streams across multiple shared states/tables with Serializable ACID Semantics. - -Instead of operating on a single key in a single operator at a time (like in vanilla Apache Flink and other stream processors) data Artisans Streaming Ledger allows you to define a set of states, connect streams of events that drive the transactions, and apply flexible business logic that operates transactionally across those states. - -## This repository contains the following `maven` modules: -* `da-streamingledger-sdk` - The `SDK` needed to define a streaming ledger application. -* `da-streamingledger-runtime-serial` - A simplistic serial runner, to experiment with the `SDK`. -* `da-streamingledger-examples` - Streaming ledger example programs. - -A parallel runner exists as part of the dA platform, -you can learn more about the dA platform here: [dA Platform](https://data-artisans.com/platform-overview) - -## Example - -Let's create a simple ledger of user accounts. -An account in the ledger is identified by a `String` key, -and has a `Long` value (its balance). - -We start by defining the streaming ledger scope. All state definitions and transaction functions -would be bound to this named scope `"Account Ledger"`. - -```java - StreamingLedger ledger = StreamingLedger.create("Account Ledger"); -``` - -Next, we define the accounts state. - -```java - StreamingLedger.State accounts = ledger.declareState("accounts") - .withKeyType(String.class) - .withValueType(Long.class); -``` - -Next, let's assume we have a `DataStream` of `TransactionEvent`s, with the following schema: - -```java -final class TransactionEvent { - - private final String sourceAccountId; - - private final String targetAccountId; - - private final long accountTransfer; - - ... - - public String getSourceAccountId() { - return sourceAccountId; - } - - public String getTargetAccountId() { - return targetAccountId; - } - - public long getAccountTransfer() { - return accountTransfer; - } - - ... -} - -``` -And we would like to transfer money from the source account to the target account, in response to an incoming `TransactionEvent`. - -```java - DataStream transactions = ... - - ledger.usingStream(transactions, "transaction stream") - .apply(new TxnHandler()) - .on(accounts, TransactionEvent::getSourceAccountId, "source-account", READ_WRITE) - .on(accounts, TransactionEvent::getTargetAccountId, "target-account", READ_WRITE); -``` - -Where `TxnHandler` is a `TransactionProcessFunction` defined as: - -```java -class TxnHandler extends TransactionProcessFunction { - - @ProcessTransaction - public void process( - TransactionEvent txn, - Context transactionContext, - @State("source-account") StateAccess sourceAccount, - @State("target-account") StateAccess targetAccount) { - - final long sourceAccountBalance = sourceAccount.read(); - final long targetAccountBalance = targetAccount.read(); - - // check the preconditions - if (sourceAccountBalance > txn.getAccountTransfer()) { - - // compute the new balances - long newSourceBalance = sourceAccountBalance - txn.getAccountTransfer(); - long newTargetBalance = targetAccountBalance + txn.getAccountTransfer(); - - // write back the updated values - sourceAccount.write(newSourceBalance); - targetAccount.write(newTargetBalance); - } - } - } -``` - -Note that `TxnHandler` will be executed with the following guaranties: -* *Atomicity:* The transaction applies all changes in an atomic manner. Either all of the modifications that the transaction function performs on the rows happen, or none. - -* *Consistency:* The transaction brings the tables from one consistent state into another consistent state. - -* *Isolation:* Each transaction executes as if it were the only transaction operating on the tables. Databases know different isolation levels with different guarantees. data Artisans Streaming Ledger here offers the best class: serializability. - -* *Durability:* The changes made by a transaction are durable and are not lost. Durability is ensured in the same way as in other Flink applications – through persistent sources and checkpoints. In the asynchronous nature of stream processing, durability of a result can only be assumed after a checkpoint. - - -A more complete example can be found [here](https://github.com/dataArtisans/da-streamingledger/blob/master/da-streamingledger-examples/src/main/java/com/dataartisans/streamingledger/examples/simpletrade/SimpleTradeExample.java) - -## Building from source - -prerequisites: - -* git -* Maven -* At least Java 8 - -``` -git clone https://github.com/dataArtisans/da-streamingledger.git -cd da-streamingledger -mvn clean install -``` - -data Artisans Streaming Ledger is now available at your local `.m2` repository. - -## Obtaining from Maven Central - -Just add the following dependency to start experimenting with the `SDK` - -``` - - com.data-artisans.streamingledger - da-streamingledger-sdk - 1.0.0 - - - com.data-artisans.streamingledger - da-streamingledger-runtime-serial - 1.0.0 - -``` - -## License - -The code in this repository is under the Apache license, see [license](https://github.com/dataArtisans/da-streamingledger/blob/master/LICENSE) +**Streaming Ledger API has been moved to a [new repository](https://github.com/ververica/streaming-ledger).** + +---- +*Apache Flink, Flink®, Apache®, the squirrel logo, and the Apache feather logo are either registered trademarks or trademarks of The Apache Software Foundation.* diff --git a/da-streamingledger-examples/pom.xml b/da-streamingledger-examples/pom.xml deleted file mode 100644 index 2d0f7f9..0000000 --- a/da-streamingledger-examples/pom.xml +++ /dev/null @@ -1,150 +0,0 @@ - - - - - 4.0.0 - - - com.data-artisans.streamingledger - da-streamingledger - 1.1-SNAPSHOT - .. - - - da-streamingledger-examples - da-streamingledger :: da-streamingledger-examples - - jar - - - - - org.apache.flink - flink-streaming-java_2.11 - ${flink.version} - provided - - - com.data-artisans.streamingledger - da-streamingledger-sdk - ${project.version} - - - com.data-artisans.streamingledger - da-streamingledger-runtime-serial - ${project.version} - - - - - org.slf4j - slf4j-log4j12 - runtime - - - - log4j - log4j - runtime - - - - - - - - - - org.apache.maven.plugins - maven-shade-plugin - 3.0.0 - - - - package - - shade - - - false - - - org.apache.flink:force-shading - com.google.code.findbugs:jsr305 - org.slf4j:* - log4j:* - net.bytebuddy:* - - - - - - *:* - - META-INF/*.SF - META-INF/*.DSA - META-INF/*.RSA - - - - - - com.dataartisans.streamingledger.examples.simpletrade.SimpleTradeExample - - - - - - - - - - - - - - - - add-dependencies-for-IDEA - - - - idea.version - - - - - - org.apache.flink - flink-streaming-java_2.11 - ${flink.version} - compile - - - org.apache.flink - flink-runtime-web_2.11 - ${flink.version} - compile - - - - - - diff --git a/da-streamingledger-examples/src/main/java/com/dataartisans/streamingledger/examples/simpletrade/DepositEvent.java b/da-streamingledger-examples/src/main/java/com/dataartisans/streamingledger/examples/simpletrade/DepositEvent.java deleted file mode 100644 index f670c94..0000000 --- a/da-streamingledger-examples/src/main/java/com/dataartisans/streamingledger/examples/simpletrade/DepositEvent.java +++ /dev/null @@ -1,99 +0,0 @@ -/* - * Copyright 2018 Data Artisans GmbH - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.dataartisans.streamingledger.examples.simpletrade; - -/** - * A simple data object that describes a deposit event. - */ -public class DepositEvent { - - private String accountId; - - private String bookEntryId; - - private long accountTransfer; - - private long bookEntryTransfer; - - /** - * Creates a new DepositEvent. - */ - public DepositEvent( - String accountId, - String bookEntryId, - long accountTransfer, - long bookEntryTransfer) { - this.accountId = accountId; - this.bookEntryId = bookEntryId; - this.accountTransfer = accountTransfer; - this.bookEntryTransfer = bookEntryTransfer; - } - - public DepositEvent() { - } - - - // ------------------------------------------------------------------------ - // properties - // ------------------------------------------------------------------------ - - public String getAccountId() { - return accountId; - } - - public void setAccountId(String accountId) { - this.accountId = accountId; - } - - public String getBookEntryId() { - return bookEntryId; - } - - public void setBookEntryId(String bookEntryId) { - this.bookEntryId = bookEntryId; - } - - public long getAccountTransfer() { - return accountTransfer; - } - - public void setAccountTransfer(long accountTransfer) { - this.accountTransfer = accountTransfer; - } - - public long getBookEntryTransfer() { - return bookEntryTransfer; - } - - public void setBookEntryTransfer(long bookEntryTransfer) { - this.bookEntryTransfer = bookEntryTransfer; - } - - // ------------------------------------------------------------------------ - // miscellaneous - // ------------------------------------------------------------------------ - - @Override - public String toString() { - return "DepositEvent {" - + "accountId=" + accountId - + ", bookEntryId=" + bookEntryId - + ", accountTransfer=" + accountTransfer - + ", bookEntryTransfer=" + bookEntryTransfer - + '}'; - } -} diff --git a/da-streamingledger-examples/src/main/java/com/dataartisans/streamingledger/examples/simpletrade/SimpleTradeExample.java b/da-streamingledger-examples/src/main/java/com/dataartisans/streamingledger/examples/simpletrade/SimpleTradeExample.java deleted file mode 100644 index 5b8fccd..0000000 --- a/da-streamingledger-examples/src/main/java/com/dataartisans/streamingledger/examples/simpletrade/SimpleTradeExample.java +++ /dev/null @@ -1,186 +0,0 @@ -/* - * Copyright 2018 Data Artisans GmbH - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.dataartisans.streamingledger.examples.simpletrade; - -import org.apache.flink.runtime.state.StateBackend; -import org.apache.flink.runtime.state.filesystem.FsStateBackend; -import org.apache.flink.streaming.api.datastream.DataStream; -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.util.OutputTag; - -import com.dataartisans.streamingledger.examples.simpletrade.generator.SyntheticSources; -import com.dataartisans.streamingledger.sdk.api.StateAccess; -import com.dataartisans.streamingledger.sdk.api.StreamingLedger; -import com.dataartisans.streamingledger.sdk.api.StreamingLedger.ResultStreams; -import com.dataartisans.streamingledger.sdk.api.TransactionProcessFunction; - -import java.net.URI; -import java.nio.file.Paths; -import java.util.function.Supplier; - -import static com.dataartisans.streamingledger.sdk.api.AccessType.READ_WRITE; - -/** - * A simple example illustrating the use of stream ledger. - * - *

The example here uses two states (called "accounts" and "bookEntries") and modifies two keys in each state in one - * joint transaction. - */ -public class SimpleTradeExample { - - private static final Supplier ZERO = () -> 0L; - - /** - * The main entry point to the sample application. This runs the program with a - * built-in data generator and the non-parallel local runtime implementation for - * the transaction logic. - * - * @param args The command line arguments. - */ - public static void main(String[] args) throws Exception { - - // set up the execution environment and the configuration - StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - - // configure Flink - env.setParallelism(4); - env.getConfig().enableObjectReuse(); - - // enable checkpoints once a minute - env.enableCheckpointing(60_000); - URI uri = Paths.get("./checkpoints").toAbsolutePath().normalize().toUri(); - StateBackend backend = new FsStateBackend(uri, true); - env.setStateBackend(backend); - - // create and add two data sources - SyntheticSources sources = SyntheticSources.create(env, 1); - - // start building the transactional streams - StreamingLedger tradeLedger = StreamingLedger.create("simple trade example"); - - // define the transactional states - StreamingLedger.State accounts = tradeLedger.declareState("accounts") - .withKeyType(String.class) - .withValueType(Long.class); - - StreamingLedger.State books = tradeLedger.declareState("bookEntries") - .withKeyType(String.class) - .withValueType(Long.class); - - // produce the deposits transaction stream - DataStream deposits = sources.deposits; - - // define transactors on states - tradeLedger.usingStream(deposits, "deposits") - .apply(new DepositHandler()) - .on(accounts, DepositEvent::getAccountId, "account", READ_WRITE) - .on(books, DepositEvent::getBookEntryId, "asset", READ_WRITE); - - // produce transactions stream - DataStream transfers = sources.transactions; - - OutputTag transactionResults = tradeLedger.usingStream(transfers, "transactions") - .apply(new TxnHandler()) - .on(accounts, TransactionEvent::getSourceAccountId, "source-account", READ_WRITE) - .on(accounts, TransactionEvent::getTargetAccountId, "target-account", READ_WRITE) - .on(books, TransactionEvent::getSourceBookEntryId, "source-asset", READ_WRITE) - .on(books, TransactionEvent::getTargetBookEntryId, "target-asset", READ_WRITE) - .output(); - - // compute the resulting streams. - ResultStreams resultsStreams = tradeLedger.resultStreams(); - - // output to the console - resultsStreams.getResultStream(transactionResults).print(); - - // trigger program execution - env.execute(); - } - - /** - * The implementation of the logic that executes a deposit. - */ - private static final class DepositHandler extends TransactionProcessFunction { - - private static final long serialVersionUID = 1; - - @ProcessTransaction - public void process( - final DepositEvent event, - final Context ctx, - final @State("account") StateAccess account, - final @State("asset") StateAccess asset) { - - long newAccountValue = account.readOr(ZERO) + event.getAccountTransfer(); - - account.write(newAccountValue); - - long newAssetValue = asset.readOr(ZERO) + event.getBookEntryTransfer(); - asset.write(newAssetValue); - } - } - - /** - * The implementation of the logic that executes the transaction. The logic is given the original - * TransactionEvent plus all states involved in the transaction. - */ - private static final class TxnHandler extends TransactionProcessFunction { - - private static final long serialVersionUID = 1; - - @ProcessTransaction - public void process( - final TransactionEvent txn, - final Context ctx, - final @State("source-account") StateAccess sourceAccount, - final @State("target-account") StateAccess targetAccount, - final @State("source-asset") StateAccess sourceAsset, - final @State("target-asset") StateAccess targetAsset) { - - final long sourceAccountBalance = sourceAccount.readOr(ZERO); - final long sourceAssetValue = sourceAsset.readOr(ZERO); - final long targetAccountBalance = targetAccount.readOr(ZERO); - final long targetAssetValue = targetAsset.readOr(ZERO); - - // check the preconditions - if (sourceAccountBalance > txn.getMinAccountBalance() - && sourceAccountBalance > txn.getAccountTransfer() - && sourceAssetValue > txn.getBookEntryTransfer()) { - - // compute the new balances - final long newSourceBalance = sourceAccountBalance - txn.getAccountTransfer(); - final long newTargetBalance = targetAccountBalance + txn.getAccountTransfer(); - final long newSourceAssets = sourceAssetValue - txn.getBookEntryTransfer(); - final long newTargetAssets = targetAssetValue + txn.getBookEntryTransfer(); - - // write back the updated values - sourceAccount.write(newSourceBalance); - targetAccount.write(newTargetBalance); - sourceAsset.write(newSourceAssets); - targetAsset.write(newTargetAssets); - - // emit result event with updated balances and flag to mark transaction as processed - ctx.emit(new TransactionResult(txn, true, newSourceBalance, newTargetBalance)); - } - else { - // emit result with unchanged balances and a flag to mark transaction as rejected - ctx.emit(new TransactionResult(txn, false, sourceAccountBalance, targetAccountBalance)); - } - } - } - -} diff --git a/da-streamingledger-examples/src/main/java/com/dataartisans/streamingledger/examples/simpletrade/TransactionEvent.java b/da-streamingledger-examples/src/main/java/com/dataartisans/streamingledger/examples/simpletrade/TransactionEvent.java deleted file mode 100644 index 2d859c7..0000000 --- a/da-streamingledger-examples/src/main/java/com/dataartisans/streamingledger/examples/simpletrade/TransactionEvent.java +++ /dev/null @@ -1,138 +0,0 @@ -/* - * Copyright 2018 Data Artisans GmbH - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.dataartisans.streamingledger.examples.simpletrade; - -/** - * A simple data object that describes a transaction. - */ -public class TransactionEvent { - - private String sourceAccountId; - - private String targetAccountId; - - private String sourceBookEntryId; - - private String targetBookEntryId; - - private long accountTransfer; - - private long bookEntryTransfer; - - private long minAccountBalance; - - /** - * Creates a new TransactionEvent for the given accounts and book entries. - */ - public TransactionEvent( - String sourceAccountId, - String targetAccountId, - String sourceBookEntryId, - String targetBookEntryId, - long accountTransfer, - long bookEntryTransfer, - long minAccountBalance) { - - this.sourceAccountId = sourceAccountId; - this.targetAccountId = targetAccountId; - this.sourceBookEntryId = sourceBookEntryId; - this.targetBookEntryId = targetBookEntryId; - this.accountTransfer = accountTransfer; - this.bookEntryTransfer = bookEntryTransfer; - this.minAccountBalance = minAccountBalance; - } - - public TransactionEvent() { - } - - // ------------------------------------------------------------------------ - // properties - // ------------------------------------------------------------------------ - - public String getSourceAccountId() { - return sourceAccountId; - } - - public void setSourceAccountId(String sourceAccountId) { - this.sourceAccountId = sourceAccountId; - } - - public String getTargetAccountId() { - return targetAccountId; - } - - public void setTargetAccountId(String targetAccountId) { - this.targetAccountId = targetAccountId; - } - - public String getSourceBookEntryId() { - return sourceBookEntryId; - } - - public void setSourceBookEntryId(String sourceBookEntryId) { - this.sourceBookEntryId = sourceBookEntryId; - } - - public String getTargetBookEntryId() { - return targetBookEntryId; - } - - public void setTargetBookEntryId(String targetBookEntryId) { - this.targetBookEntryId = targetBookEntryId; - } - - public long getAccountTransfer() { - return accountTransfer; - } - - public void setAccountTransfer(long accountTransfer) { - this.accountTransfer = accountTransfer; - } - - public long getBookEntryTransfer() { - return bookEntryTransfer; - } - - public void setBookEntryTransfer(long bookEntryTransfer) { - this.bookEntryTransfer = bookEntryTransfer; - } - - public long getMinAccountBalance() { - return minAccountBalance; - } - - public void setMinAccountBalance(long minAccountBalance) { - this.minAccountBalance = minAccountBalance; - } - - // ------------------------------------------------------------------------ - // miscellaneous - // ------------------------------------------------------------------------ - - @Override - public String toString() { - return "TransactionEvent {" - + "sourceAccountId=" + sourceAccountId - + ", targetAccountId=" + targetAccountId - + ", sourceBookEntryId=" + sourceBookEntryId - + ", targetBookEntryId=" + targetBookEntryId - + ", accountTransfer=" + accountTransfer - + ", bookEntryTransfer=" + bookEntryTransfer - + ", minAccountBalance=" + minAccountBalance - + '}'; - } -} diff --git a/da-streamingledger-examples/src/main/java/com/dataartisans/streamingledger/examples/simpletrade/TransactionResult.java b/da-streamingledger-examples/src/main/java/com/dataartisans/streamingledger/examples/simpletrade/TransactionResult.java deleted file mode 100644 index 3fbeece..0000000 --- a/da-streamingledger-examples/src/main/java/com/dataartisans/streamingledger/examples/simpletrade/TransactionResult.java +++ /dev/null @@ -1,107 +0,0 @@ -/* - * Copyright 2018 Data Artisans GmbH - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.dataartisans.streamingledger.examples.simpletrade; - -import static java.util.Objects.requireNonNull; - -/** - * Data type describing the result of a processed transaction. - * It describes whether the transaction was successful as well as the resulting account balances. - */ -public class TransactionResult { - - private TransactionEvent transaction; - - private boolean success; - - private long newSourceAccountBalance; - - private long newTargetAccountBalance; - - /** - * Creates a new transaction result. - * - * @param transaction The original transaction event. - * @param success True, if the transaction was successful, false if not. - * @param newSourceAccountBalance The resulting balance of the source account. - * @param newTargetAccountBalance The resulting balance of the target account. - */ - public TransactionResult( - TransactionEvent transaction, - boolean success, - long newSourceAccountBalance, - long newTargetAccountBalance) { - - this.transaction = requireNonNull(transaction); - this.success = success; - this.newSourceAccountBalance = newSourceAccountBalance; - this.newTargetAccountBalance = newTargetAccountBalance; - } - - public TransactionResult() { - } - - // ------------------------------------------------------------------------ - // Properties - // ------------------------------------------------------------------------ - - public TransactionEvent getTransaction() { - return transaction; - } - - public void setTransaction(TransactionEvent transaction) { - this.transaction = transaction; - } - - public boolean isSuccess() { - return success; - } - - public void setSuccess(boolean success) { - this.success = success; - } - - public long getNewSourceAccountBalance() { - return newSourceAccountBalance; - } - - public void setNewSourceAccountBalance(long newSourceAccountBalance) { - this.newSourceAccountBalance = newSourceAccountBalance; - } - - public long getNewTargetAccountBalance() { - return newTargetAccountBalance; - } - - public void setNewTargetAccountBalance(long newTargetAccountBalance) { - this.newTargetAccountBalance = newTargetAccountBalance; - } - - // ------------------------------------------------------------------------ - // Miscellaneous - // ------------------------------------------------------------------------ - - @Override - public String toString() { - return "TransactionResult {" - + "transaction=" + transaction - + ", success=" + success - + ", newSourceAccountBalance=" + newSourceAccountBalance - + ", newTargetAccountBalance=" + newTargetAccountBalance - + '}'; - } -} diff --git a/da-streamingledger-examples/src/main/java/com/dataartisans/streamingledger/examples/simpletrade/generator/DepositsThenTransactionsSource.java b/da-streamingledger-examples/src/main/java/com/dataartisans/streamingledger/examples/simpletrade/generator/DepositsThenTransactionsSource.java deleted file mode 100644 index d3c40a7..0000000 --- a/da-streamingledger-examples/src/main/java/com/dataartisans/streamingledger/examples/simpletrade/generator/DepositsThenTransactionsSource.java +++ /dev/null @@ -1,129 +0,0 @@ -/* - * Copyright 2018 Data Artisans GmbH - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.dataartisans.streamingledger.examples.simpletrade.generator; - -import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; -import org.apache.flink.types.Either; - -import com.dataartisans.streamingledger.examples.simpletrade.DepositEvent; -import com.dataartisans.streamingledger.examples.simpletrade.TransactionEvent; - -import java.util.SplittableRandom; - -import static org.apache.flink.util.Preconditions.checkArgument; - -/** - * A random data generator with data rate throttling logic. - * - *

This source emits two kinds of events {@link DepositEvent} and a {@link TransactionEvent}. First this source emits - * deposit events for each account and book entry, and then starts emitting random transaction events while not - * canceled. - */ -final class DepositsThenTransactionsSource extends RichParallelSourceFunction> { - - private static final int NUM_ROWS = 1_000_000; - private static final String ACCOUNT_ID_PREFIX = "ACCT-"; - private static final String BOOK_ENTRY_ID_PREFIX = "BOOK-"; - private static final long MAX_ACCOUNT_TRANSFER = 10_000; - private static final long MAX_BOOK_TRANSFER = 1_000; - private static final long MIN_BALANCE = 0; - - private static final long serialVersionUID = 1L; - - private final int maxRecordsPerSecond; - - private volatile boolean running = true; - - DepositsThenTransactionsSource(int maxRecordsPerSecond) { - checkArgument(maxRecordsPerSecond == -1 || maxRecordsPerSecond > 0, - "maxRecordsPerSecond must be positive or -1 (infinite)"); - this.maxRecordsPerSecond = maxRecordsPerSecond; - } - - @Override - public void run(SourceContext> context) throws Exception { - final int numberOfParallelSubtasks = getRuntimeContext().getNumberOfParallelSubtasks(); - final int indexOfThisSubtask = getRuntimeContext().getIndexOfThisSubtask(); - - collectDeposits(context, indexOfThisSubtask, numberOfParallelSubtasks); - collectTransactions(context, numberOfParallelSubtasks); - } - - @Override - public void cancel() { - running = false; - } - - private void collectDeposits( - SourceContext> context, - final int indexOfThisSubtask, - final int numberOfParallelSubtasks) { - - final int startId = (indexOfThisSubtask * NUM_ROWS) / numberOfParallelSubtasks; - final int endId = ((indexOfThisSubtask + 1) * NUM_ROWS) / numberOfParallelSubtasks; - - for (int i = startId; i < endId; i++) { - String accountId = ACCOUNT_ID_PREFIX + i; - String bookEntryId = BOOK_ENTRY_ID_PREFIX + i; - - DepositEvent event = new DepositEvent( - accountId, - bookEntryId, - MAX_ACCOUNT_TRANSFER, - MAX_BOOK_TRANSFER); - - context.collect(Either.Left(event)); - } - } - - private void collectTransactions( - SourceContext> context, - int numberOfParallelSubtasks) throws InterruptedException { - - SplittableRandom random = new SplittableRandom(); - Throttler throttler = new Throttler(maxRecordsPerSecond, numberOfParallelSubtasks); - while (running) { - TransactionEvent event = randomTransactionEvent(random); - context.collect(Either.Right(event)); - throttler.throttle(); - } - } - - private TransactionEvent randomTransactionEvent(SplittableRandom rnd) { - final long accountsTransfer = rnd.nextLong(MAX_ACCOUNT_TRANSFER); - final long transfer = rnd.nextLong(MAX_BOOK_TRANSFER); - while (true) { - final int sourceAcct = rnd.nextInt(NUM_ROWS); - final int targetAcct = rnd.nextInt(NUM_ROWS); - final int sourceBook = rnd.nextInt(NUM_ROWS); - final int targetBook = rnd.nextInt(NUM_ROWS); - - if (sourceAcct == targetAcct || sourceBook == targetBook) { - continue; - } - return new TransactionEvent( - ACCOUNT_ID_PREFIX + sourceAcct, - ACCOUNT_ID_PREFIX + targetAcct, - BOOK_ENTRY_ID_PREFIX + sourceBook, - BOOK_ENTRY_ID_PREFIX + targetBook, - accountsTransfer, - transfer, - MIN_BALANCE); - } - } - -} diff --git a/da-streamingledger-examples/src/main/java/com/dataartisans/streamingledger/examples/simpletrade/generator/SyntheticSources.java b/da-streamingledger-examples/src/main/java/com/dataartisans/streamingledger/examples/simpletrade/generator/SyntheticSources.java deleted file mode 100644 index aced21d..0000000 --- a/da-streamingledger-examples/src/main/java/com/dataartisans/streamingledger/examples/simpletrade/generator/SyntheticSources.java +++ /dev/null @@ -1,84 +0,0 @@ -/* - * Copyright 2018 Data Artisans GmbH - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.dataartisans.streamingledger.examples.simpletrade.generator; - -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.streaming.api.datastream.DataStream; -import org.apache.flink.streaming.api.datastream.DataStreamSource; -import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.api.functions.ProcessFunction; -import org.apache.flink.types.Either; -import org.apache.flink.util.Collector; -import org.apache.flink.util.OutputTag; - -import com.dataartisans.streamingledger.examples.simpletrade.DepositEvent; -import com.dataartisans.streamingledger.examples.simpletrade.TransactionEvent; - -/** - * Creates two synthetic sources for {@link DepositEvent} and {@link TransactionEvent}. - */ -public final class SyntheticSources { - - public final DataStream deposits; - public final DataStream transactions; - - /** - * Creates and adds two synthetic sources for {@link DepositEvent} and {@link TransactionEvent}. - * - * @param env the streaming environment to add the sources to. - * @param recordsPerSecond the number of {@link TransactionEvent} per second to generate. - * @return a {@link DataStream} for each event type generated. - */ - public static SyntheticSources create(StreamExecutionEnvironment env, int recordsPerSecond) { - - final DataStreamSource> depositsAndTransactions = env.addSource( - new DepositsThenTransactionsSource(recordsPerSecond)); - - final OutputTag transactionsSideOutput = new OutputTag<>( - "transactions side output", - TypeInformation.of(TransactionEvent.class)); - - final SingleOutputStreamOperator deposits = depositsAndTransactions.process( - new ProcessFunction, DepositEvent>() { - - @Override - public void processElement( - Either depositOrTransaction, - Context context, - Collector out) { - - if (depositOrTransaction.isLeft()) { - out.collect(depositOrTransaction.left()); - } - else { - context.output(transactionsSideOutput, depositOrTransaction.right()); - } - } - }); - - final DataStream transactions = deposits.getSideOutput(transactionsSideOutput); - - return new SyntheticSources(deposits, transactions); - } - - SyntheticSources(DataStream deposits, DataStream transactions) { - this.deposits = deposits; - this.transactions = transactions; - } -} - diff --git a/da-streamingledger-examples/src/main/java/com/dataartisans/streamingledger/examples/simpletrade/generator/Throttler.java b/da-streamingledger-examples/src/main/java/com/dataartisans/streamingledger/examples/simpletrade/generator/Throttler.java deleted file mode 100644 index c917163..0000000 --- a/da-streamingledger-examples/src/main/java/com/dataartisans/streamingledger/examples/simpletrade/generator/Throttler.java +++ /dev/null @@ -1,79 +0,0 @@ -/* - * Copyright 2018 Data Artisans GmbH - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.dataartisans.streamingledger.examples.simpletrade.generator; - -import static org.apache.flink.util.Preconditions.checkArgument; - -final class Throttler { - - private final long throttleBatchSize; - private final long nanosPerBatch; - - private long endOfNextBatchNanos; - private int currentBatch; - - Throttler(long maxRecordsPerSecond, int numberOfParallelSubtasks) { - checkArgument(maxRecordsPerSecond == -1 || maxRecordsPerSecond > 0, - "maxRecordsPerSecond must be positive or -1 (infinite)"); - checkArgument(numberOfParallelSubtasks > 0, "numberOfParallelSubtasks must be greater than 0"); - - if (maxRecordsPerSecond == -1) { - // unlimited speed - throttleBatchSize = -1; - nanosPerBatch = 0; - endOfNextBatchNanos = System.nanoTime() + nanosPerBatch; - currentBatch = 0; - return; - } - final float ratePerSubtask = - (float) maxRecordsPerSecond / numberOfParallelSubtasks; - - if (ratePerSubtask >= 10000) { - // high rates: all throttling in intervals of 2ms - throttleBatchSize = (int) ratePerSubtask / 500; - nanosPerBatch = 2_000_000L; - } - else { - throttleBatchSize = ((int) (ratePerSubtask / 20)) + 1; - nanosPerBatch = ((int) (1_000_000_000L / ratePerSubtask)) * throttleBatchSize; - } - this.endOfNextBatchNanos = System.nanoTime() + nanosPerBatch; - this.currentBatch = 0; - } - - void throttle() throws InterruptedException { - if (throttleBatchSize == -1) { - return; - } - if (++currentBatch != throttleBatchSize) { - return; - } - currentBatch = 0; - - final long now = System.nanoTime(); - final int millisRemaining = (int) ((endOfNextBatchNanos - now) / 1_000_000); - - if (millisRemaining > 0) { - endOfNextBatchNanos += nanosPerBatch; - Thread.sleep(millisRemaining); - } - else { - endOfNextBatchNanos = now + nanosPerBatch; - } - } - -} diff --git a/da-streamingledger-examples/src/main/resources/log4j.properties b/da-streamingledger-examples/src/main/resources/log4j.properties deleted file mode 100644 index bf1fd13..0000000 --- a/da-streamingledger-examples/src/main/resources/log4j.properties +++ /dev/null @@ -1,19 +0,0 @@ -# -# Copyright 2018 Data Artisans GmbH -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# -log4j.rootLogger=INFO, console -log4j.appender.console=org.apache.log4j.ConsoleAppender -log4j.appender.console.layout=org.apache.log4j.PatternLayout -log4j.appender.console.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n diff --git a/da-streamingledger-runtime-serial/pom.xml b/da-streamingledger-runtime-serial/pom.xml deleted file mode 100644 index 71eb9a7..0000000 --- a/da-streamingledger-runtime-serial/pom.xml +++ /dev/null @@ -1,48 +0,0 @@ - - - - - 4.0.0 - - - com.data-artisans.streamingledger - da-streamingledger - 1.1-SNAPSHOT - .. - - - da-streamingledger-runtime-serial - da-streamingledger :: da-streamingledger-runtime-serial - - jar - - - - com.data-artisans.streamingledger - da-streamingledger-sdk - ${project.version} - - - org.apache.flink - flink-streaming-java_2.11 - ${flink.version} - provided - - - - diff --git a/da-streamingledger-runtime-serial/src/main/java/com/dataartisans/streamingledger/runtime/serial/SerialStateAccess.java b/da-streamingledger-runtime-serial/src/main/java/com/dataartisans/streamingledger/runtime/serial/SerialStateAccess.java deleted file mode 100644 index 4dc5ec0..0000000 --- a/da-streamingledger-runtime-serial/src/main/java/com/dataartisans/streamingledger/runtime/serial/SerialStateAccess.java +++ /dev/null @@ -1,111 +0,0 @@ -/* - * Copyright 2018 Data Artisans GmbH - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.dataartisans.streamingledger.runtime.serial; - -import org.apache.flink.api.common.state.MapState; -import org.apache.flink.api.java.functions.KeySelector; - -import com.dataartisans.streamingledger.sdk.api.AccessType; -import com.dataartisans.streamingledger.sdk.api.StateAccess; -import com.dataartisans.streamingledger.sdk.api.StateAccessException; -import com.dataartisans.streamingledger.sdk.api.StateNotReadableException; -import com.dataartisans.streamingledger.sdk.api.StateNotWritableException; -import com.dataartisans.streamingledger.sdk.api.StreamingLedger.StateAccessSpec; - -import static java.util.Objects.requireNonNull; - -final class SerialStateAccess implements StateAccess { - private final StateAccessSpec spec; - private final MapState state; - - private final KeySelector keySelector; - private final boolean writeOnly; - private final boolean readOnly; - - private K key; - private V value; - private boolean changed; - - SerialStateAccess(StateAccessSpec spec, MapState state) { - this.spec = requireNonNull(spec); - this.state = requireNonNull(state); - this.keySelector = requireNonNull(spec.keyAccess); - this.writeOnly = spec.accessType == AccessType.WRITE; - this.readOnly = spec.accessType == AccessType.READ; - } - - @Override - public V read() throws StateAccessException { - if (writeOnly) { - throw new StateNotReadableException(this); - } - return value; - } - - @Override - public void write(V newValue) throws StateAccessException { - if (readOnly) { - throw new StateNotWritableException(this); - } - this.value = newValue; - this.changed = true; - } - - @Override - public void delete() throws StateAccessException { - if (readOnly) { - throw new StateNotWritableException(this); - } - this.value = null; - this.changed = true; - } - - @Override - public String getStateName() { - return spec.state.getName(); - } - - @Override - public String getStateAccessName() { - return spec.bindName; - } - - // ----------------------------------------------------- - // For internal use by SerialTransactor - // ----------------------------------------------------- - - void prepare(InT input) throws Exception { - final K key = keySelector.getKey(input); - this.key = key; - this.changed = false; - if (!writeOnly) { - this.value = state.get(key); - } - } - - void commit(boolean wasAborted) throws Exception { - if (!changed || wasAborted) { - return; - } - if (value == null) { - state.remove(key); - } - else { - state.put(key, value); - } - } -} diff --git a/da-streamingledger-runtime-serial/src/main/java/com/dataartisans/streamingledger/runtime/serial/SerialStreamingLedgerRuntimeProvider.java b/da-streamingledger-runtime-serial/src/main/java/com/dataartisans/streamingledger/runtime/serial/SerialStreamingLedgerRuntimeProvider.java deleted file mode 100644 index 1fac4c2..0000000 --- a/da-streamingledger-runtime-serial/src/main/java/com/dataartisans/streamingledger/runtime/serial/SerialStreamingLedgerRuntimeProvider.java +++ /dev/null @@ -1,94 +0,0 @@ -/* - * Copyright 2018 Data Artisans GmbH - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.dataartisans.streamingledger.runtime.serial; - -import org.apache.flink.streaming.api.datastream.DataStream; -import org.apache.flink.streaming.api.datastream.KeyedStream; -import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; -import org.apache.flink.util.OutputTag; - -import com.dataartisans.streamingledger.sdk.api.StreamingLedger.ResultStreams; -import com.dataartisans.streamingledger.sdk.common.union.TaggedElement; -import com.dataartisans.streamingledger.sdk.common.union.Union; -import com.dataartisans.streamingledger.sdk.spi.InputAndSpec; -import com.dataartisans.streamingledger.sdk.spi.StreamingLedgerRuntimeProvider; -import com.dataartisans.streamingledger.sdk.spi.StreamingLedgerSpec; - -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.stream.Collectors; - -/** - * A {@link StreamingLedgerRuntimeProvider} for the default serial transaction implementation. - */ -public final class SerialStreamingLedgerRuntimeProvider implements StreamingLedgerRuntimeProvider { - - private static DataStream union(List> inputAndSpecs) { - List> inputs = inputAndSpecs.stream() - .map(inputAndSpec -> inputAndSpec.inputStream) - .collect(Collectors.toList()); - - return Union.apply(inputs); - } - - // ------------------------------------------------------------------------------------------------------- - // Helpers - // ------------------------------------------------------------------------------------------------------- - - private static List> createSideOutputTags(List> specs) { - List> outputTags = new ArrayList<>(); - for (InputAndSpec streamWithSpec : specs) { - OutputTag outputTag = new OutputTag<>(streamWithSpec.streamName, streamWithSpec.streamSpec.resultType); - outputTags.add(outputTag); - } - return outputTags; - } - - private static List> specs(List> inputAndSpecs) { - return inputAndSpecs.stream() - .map(inputAndSpec -> inputAndSpec.streamSpec) - .collect(Collectors.toList()); - } - - @Override - public ResultStreams translate(String name, List> streamLedgerSpecs) { - List> sideOutputTags = createSideOutputTags(streamLedgerSpecs); - - // the input stream is a union of different streams. - KeyedStream input = union(streamLedgerSpecs) - .keyBy(unused -> true); - - // main pipeline - String serialTransactorName = "SerialTransactor(" + name + ")"; - SingleOutputStreamOperator resultStream = input - .process(new SerialTransactor(specs(streamLedgerSpecs), sideOutputTags)) - .name(serialTransactorName) - .uid(serialTransactorName + "___SERIAL_TX") - .forceNonParallel() - .returns(Void.class); - - // gather the sideOutputs. - Map> output = new HashMap<>(); - for (OutputTag outputTag : sideOutputTags) { - DataStream rs = resultStream.getSideOutput(outputTag); - output.put(outputTag.getId(), rs); - } - return new ResultStreams(output); - } -} diff --git a/da-streamingledger-runtime-serial/src/main/java/com/dataartisans/streamingledger/runtime/serial/SerialTransactor.java b/da-streamingledger-runtime-serial/src/main/java/com/dataartisans/streamingledger/runtime/serial/SerialTransactor.java deleted file mode 100644 index 4c88303..0000000 --- a/da-streamingledger-runtime-serial/src/main/java/com/dataartisans/streamingledger/runtime/serial/SerialTransactor.java +++ /dev/null @@ -1,98 +0,0 @@ -/* - * Copyright 2018 Data Artisans GmbH - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.dataartisans.streamingledger.runtime.serial; - -import org.apache.flink.api.common.functions.RuntimeContext; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.streaming.api.functions.ProcessFunction; -import org.apache.flink.util.Collector; -import org.apache.flink.util.OutputTag; - -import com.dataartisans.streamingledger.sdk.common.union.TaggedElement; -import com.dataartisans.streamingledger.sdk.spi.StreamingLedgerSpec; - -import java.util.List; -import java.util.stream.Collectors; - -import static java.util.Objects.requireNonNull; - -final class SerialTransactor extends ProcessFunction { - - private static final long serialVersionUID = 1; - - private final List> specs; - private final List> sideOutputs; - private final SideOutputContext collector; - - private transient SingleStreamSerialTransactor[] transactors; - - SerialTransactor(List> specs, List> sideOutputTags) { - this.specs = requireNonNull(specs); - this.sideOutputs = castOutputTags(sideOutputTags); - this.collector = new SideOutputContext<>(); - } - - @SuppressWarnings("unchecked") - private static SingleStreamSerialTransactor[] newSingleStreamSerialTransactorArray(int n) { - return (SingleStreamSerialTransactor[]) new SingleStreamSerialTransactor[n]; - } - - @SuppressWarnings("unchecked") - private static SingleStreamSerialTransactor singleStreamSerialTransactorFromSpec( - StreamingLedgerSpec aSpec, - OutputTag aTag, - SideOutputContext collector, - RuntimeContext runtimeContext) { - - OutputTag outputTag = (OutputTag) aTag; - StreamingLedgerSpec spec = (StreamingLedgerSpec) aSpec; - return new SingleStreamSerialTransactor<>(spec, outputTag, collector, runtimeContext); - } - - @SuppressWarnings("unchecked") - private static List> castOutputTags(List> sideOutputTags) { - return sideOutputTags.stream() - .map(outputTag -> (OutputTag) outputTag) - .collect(Collectors.toList()); - } - - @Override - public void open(Configuration parameters) throws Exception { - super.open(parameters); - final RuntimeContext runtimeContext = getRuntimeContext(); - - SingleStreamSerialTransactor[] transactors = - newSingleStreamSerialTransactorArray(specs.size()); - - // initialize the individual transactors - for (int streamTag = 0; streamTag < specs.size(); streamTag++) { - StreamingLedgerSpec aSpec = specs.get(streamTag); - OutputTag aTag = sideOutputs.get(streamTag); - transactors[streamTag] = singleStreamSerialTransactorFromSpec(aSpec, aTag, collector, runtimeContext); - } - this.transactors = transactors; - } - - @Override - public void processElement(TaggedElement input, Context context, Collector unused) throws Exception { - collector.setContext(context); - - final int streamTag = input.getDataStreamTag(); - SingleStreamSerialTransactor transactor = transactors[streamTag]; - transactor.apply(input.getElement()); - } -} diff --git a/da-streamingledger-runtime-serial/src/main/java/com/dataartisans/streamingledger/runtime/serial/SideOutputContext.java b/da-streamingledger-runtime-serial/src/main/java/com/dataartisans/streamingledger/runtime/serial/SideOutputContext.java deleted file mode 100644 index 5d6db23..0000000 --- a/da-streamingledger-runtime-serial/src/main/java/com/dataartisans/streamingledger/runtime/serial/SideOutputContext.java +++ /dev/null @@ -1,73 +0,0 @@ -/* - * Copyright 2018 Data Artisans GmbH - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.dataartisans.streamingledger.runtime.serial; - -import org.apache.flink.streaming.api.functions.ProcessFunction; -import org.apache.flink.streaming.api.functions.ProcessFunction.Context; -import org.apache.flink.util.OutputTag; - -import com.dataartisans.streamingledger.sdk.api.TransactionProcessFunction; - -import java.io.Serializable; -import java.util.ArrayList; - -final class SideOutputContext implements TransactionProcessFunction.Context, Serializable { - - private static final long serialVersionUID = 1; - private final ArrayList records = new ArrayList<>(); - private transient OutputTag outputTag; - private transient ProcessFunction.Context context; - private boolean aborted; - - void setContext(Context context) { - this.context = context; - } - - void setOutputTag(OutputTag outputTag) { - this.outputTag = outputTag; - } - - void prepare() { - aborted = false; - records.clear(); - } - - @Override - public void abort() { - aborted = true; - records.clear(); - } - - @SuppressWarnings("unchecked") - @Override - public void emit(T record) { - records.add(record); - } - - boolean wasAborted() { - return aborted; - } - - void emitChanges() { - if (aborted) { - return; - } - for (T record : records) { - context.output(outputTag, record); - } - } -} diff --git a/da-streamingledger-runtime-serial/src/main/java/com/dataartisans/streamingledger/runtime/serial/SingleStreamSerialTransactor.java b/da-streamingledger-runtime-serial/src/main/java/com/dataartisans/streamingledger/runtime/serial/SingleStreamSerialTransactor.java deleted file mode 100644 index 6720cc2..0000000 --- a/da-streamingledger-runtime-serial/src/main/java/com/dataartisans/streamingledger/runtime/serial/SingleStreamSerialTransactor.java +++ /dev/null @@ -1,116 +0,0 @@ -/* - * Copyright 2018 Data Artisans GmbH - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.dataartisans.streamingledger.runtime.serial; - -import org.apache.flink.api.common.functions.RuntimeContext; -import org.apache.flink.api.common.state.MapState; -import org.apache.flink.api.common.state.MapStateDescriptor; -import org.apache.flink.util.OutputTag; - -import com.dataartisans.streamingledger.sdk.api.StreamingLedger.StateAccessSpec; -import com.dataartisans.streamingledger.sdk.common.reflection.ByteBuddyProcessFunctionInvoker; -import com.dataartisans.streamingledger.sdk.common.reflection.ProcessFunctionInvoker; -import com.dataartisans.streamingledger.sdk.spi.StreamingLedgerSpec; - -import static java.util.Objects.requireNonNull; - -/** - * A serially executing transactor. - * - * @param transaction event type - * @param transaction result type - */ -final class SingleStreamSerialTransactor { - private final ProcessFunctionInvoker userFunction; - private final SerialStateAccess[] accesses; - private final OutputTag sideOutputTag; - private final SideOutputContext context; - - SingleStreamSerialTransactor( - StreamingLedgerSpec spec, - OutputTag sideOutputTag, - SideOutputContext context, - RuntimeContext runtimeContext) { - - requireNonNull(spec); - requireNonNull(sideOutputTag); - requireNonNull(runtimeContext); - - this.context = context; - this.sideOutputTag = sideOutputTag; - this.accesses = createStateAccessesFromSpec(spec, runtimeContext); - this.userFunction = ByteBuddyProcessFunctionInvoker.create(spec); - } - - @SuppressWarnings("unchecked") - private static SerialStateAccess[] newStateAccessArray(int n) { - return (SerialStateAccess[]) new SerialStateAccess[n]; - } - - // -------------------------------------------------------------------------------------------------------- - // Internal helpers - // -------------------------------------------------------------------------------------------------------- - - private static MapState fromSpec(StateAccessSpec spec, RuntimeContext context) { - MapStateDescriptor descriptor = new MapStateDescriptor<>( - spec.state.getName(), - spec.state.getKeyType(), - spec.state.getValueType()); - - return context.getMapState(descriptor); - } - - void apply(InT input) throws Exception { - context.setOutputTag(sideOutputTag); - // - // prepare - // - for (SerialStateAccess access : accesses) { - access.prepare(input); - } - context.prepare(); - // - // invoke - // - userFunction.invoke(input, context, accesses); - // - // commit or abort - // - for (SerialStateAccess access : accesses) { - access.commit(context.wasAborted()); - } - context.emitChanges(); - } - - private SerialStateAccess[] createStateAccessesFromSpec( - StreamingLedgerSpec spec, - RuntimeContext ctx) { - - SerialStateAccess[] accesses = newStateAccessArray(spec.stateBindings.size()); - - for (int i = 0; i < accesses.length; i++) { - final StateAccessSpec accessSpec = spec.stateBindings.get(i); - - MapState state = fromSpec(accessSpec, ctx); - - @SuppressWarnings({"rawtypes", "unchecked"}) - SerialStateAccess serialStateAccess = new SerialStateAccess(accessSpec, state); - accesses[i] = serialStateAccess; - } - return accesses; - } -} diff --git a/da-streamingledger-runtime-serial/src/main/resources/META-INF/services/com.dataartisans.streamingledger.sdk.spi.StreamingLedgerRuntimeProvider b/da-streamingledger-runtime-serial/src/main/resources/META-INF/services/com.dataartisans.streamingledger.sdk.spi.StreamingLedgerRuntimeProvider deleted file mode 100644 index 2068acb..0000000 --- a/da-streamingledger-runtime-serial/src/main/resources/META-INF/services/com.dataartisans.streamingledger.sdk.spi.StreamingLedgerRuntimeProvider +++ /dev/null @@ -1,17 +0,0 @@ -# -# Copyright 2018 Data Artisans GmbH -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -com.dataartisans.streamingledger.runtime.serial.SerialStreamingLedgerRuntimeProvider diff --git a/da-streamingledger-sdk/pom.xml b/da-streamingledger-sdk/pom.xml deleted file mode 100644 index 93ac77a..0000000 --- a/da-streamingledger-sdk/pom.xml +++ /dev/null @@ -1,123 +0,0 @@ - - - - - 4.0.0 - - - com.data-artisans.streamingledger - da-streamingledger - 1.1-SNAPSHOT - .. - - - da-streamingledger-sdk - da-streamingledger :: da-streamingledger-sdk - - jar - - - - org.apache.flink - flink-streaming-java_2.11 - ${flink.version} - provided - - - net.bytebuddy - byte-buddy - ${bytebuddy.version} - - - - - org.apache.flink - flink-test-utils-junit - ${flink.version} - test - - - org.apache.flink - flink-core - ${flink.version} - test-jar - test - - - - - - - - - org.apache.maven.plugins - maven-shade-plugin - 3.0.0 - - - - package - - shade - - - true - - - net.bytebuddy - com.dataartisans.streamingledger.shaded.net.bytebuddy - - - true - - - net.bytebuddy:* - - - - org.apache.flink:force-shading - com.google.code.findbugs:jsr305 - org.slf4j:* - log4j:* - - - - - - *:* - - META-INF/*.SF - META-INF/*.DSA - META-INF/*.RSA - - - - - - - - - - - - - - - diff --git a/da-streamingledger-sdk/src/main/java/com/dataartisans/streamingledger/sdk/api/AccessType.java b/da-streamingledger-sdk/src/main/java/com/dataartisans/streamingledger/sdk/api/AccessType.java deleted file mode 100644 index acfd9a7..0000000 --- a/da-streamingledger-sdk/src/main/java/com/dataartisans/streamingledger/sdk/api/AccessType.java +++ /dev/null @@ -1,63 +0,0 @@ -/* - * Copyright 2018 Data Artisans GmbH - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.dataartisans.streamingledger.sdk.api; - -/** - * The AccessType defines whether certain state is only read or written or both. - * - *

The type of access has an impact on whether accesses to the state (and key) by different - * transactions can execute independent of each other, or whether they need to be coordinated. - * For example, multiple transactions can concurrently read a certain state, if no other - * state currently writes the state. - */ -@SuppressWarnings("unused") -public enum AccessType { - - /** - * Indicates that state is only read, but not modified. - * If state is only read, it can never block any other transaction from advancing. - */ - READ(false, false), - - /** - * Indicates that state is modified, but not read before. Modifying state means that coordination - * is involved to ensure consistency for the write back. - */ - WRITE(true, true), - - /** - * Indicates that state is read and later modified. This access type involves coordination to - * ensure consistency for the write back, and possibly additional messages to read state. - */ - READ_WRITE(true, false); - - private final boolean requiresLocking; - private final boolean writeOnly; - - AccessType(boolean requiresLocking, boolean writeOnly) { - this.requiresLocking = requiresLocking; - this.writeOnly = writeOnly; - } - - public boolean requiresLocking() { - return requiresLocking; - } - - public boolean writeOnly() { - return writeOnly; - } -} diff --git a/da-streamingledger-sdk/src/main/java/com/dataartisans/streamingledger/sdk/api/StateAccess.java b/da-streamingledger-sdk/src/main/java/com/dataartisans/streamingledger/sdk/api/StateAccess.java deleted file mode 100644 index 66cb53c..0000000 --- a/da-streamingledger-sdk/src/main/java/com/dataartisans/streamingledger/sdk/api/StateAccess.java +++ /dev/null @@ -1,96 +0,0 @@ -/* - * Copyright 2018 Data Artisans GmbH - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.dataartisans.streamingledger.sdk.api; - -import com.dataartisans.streamingledger.sdk.api.StreamingLedger.State; -import com.dataartisans.streamingledger.sdk.api.TransactionProcessFunction.Context; - -import java.util.function.Supplier; - -/** - * This interface provides access to values of a {@link State}. - * - *

The state access binds a specific {@link State} value and - * a specific argument to a {@link TransactionProcessFunction} process function. - * - *

For example:

- * {@code
- * void process(InputEvent e, .., @State("age") StateAccess age) { .. }
- * }
- * 
- * Would bind {@code age} to a specific value (as identified by the input event {@code e}). - * Calling {@link #read()} would return the bound value, and calling {@link #write(Object)} would - * replace the value with the supplied argument. - * - *

Any change to this state access would made visible to other transactions, atomically as soon as the process - * function completes, unless explicitly aborted via {@link Context#abort()}. - * - * @param value parameter type. - */ -public interface StateAccess { - - /** - * Reads a value. - * - * @return the value bound to this state access. - * @throws StateAccessException if this state access is not readable. i.e. it is not one of {@link AccessType#READ} - * or {@link AccessType#READ_WRITE}. - */ - T read() throws StateAccessException; - - /** - * Reads a value by replacing missing values with a default supplier. - * - * @param defaultSupplier a default value supplier. - * @return the value bound to this state access or default otherwise. - * @throws StateAccessException if this state access is not readable. (only {@link AccessType#WRITE}) - */ - default T readOr(Supplier defaultSupplier) throws StateAccessException { - T read = read(); - if (read != null) { - return read; - } - return defaultSupplier.get(); - } - - /** - * Writes a value. - * - * @param newValue the value to bind with this state access. - * @throws StateAccessException if this state access is not writeable. (only {@link AccessType#READ}). - */ - void write(T newValue) throws StateAccessException; - - /** - * Deletes a value. - * - *

Please note: that it would also delete the key associated with this value. - * - * @throws StateAccessException if this state access is not writeable. (only {@link AccessType#READ}). - */ - void delete() throws StateAccessException; - - /** - * @return the {@link State} name that this state access is referencing. - */ - String getStateName(); - - /** - * @return the bind name of this state access. - */ - String getStateAccessName(); -} diff --git a/da-streamingledger-sdk/src/main/java/com/dataartisans/streamingledger/sdk/api/StateAccessException.java b/da-streamingledger-sdk/src/main/java/com/dataartisans/streamingledger/sdk/api/StateAccessException.java deleted file mode 100644 index 176b392..0000000 --- a/da-streamingledger-sdk/src/main/java/com/dataartisans/streamingledger/sdk/api/StateAccessException.java +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Copyright 2018 Data Artisans GmbH - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.dataartisans.streamingledger.sdk.api; - -/** - * Base exception for failures during a state access. - * - *

This is an unchecked exception, because instances of this exception are raised as a result - * of an incorrect program, for example when read and write accesses are incorrectly declared. - * Instances of this exception are hence rarely expected to be handled explicitly within an user's - * implementation of an application, but rather expected to "bubble up" and report a proper - * application failure. - */ -public class StateAccessException extends RuntimeException { - - private static final long serialVersionUID = 1L; - - public StateAccessException(String message) { - super(message); - } - - public StateAccessException(String message, Throwable cause) { - super(message, cause); - } -} diff --git a/da-streamingledger-sdk/src/main/java/com/dataartisans/streamingledger/sdk/api/StateNotReadableException.java b/da-streamingledger-sdk/src/main/java/com/dataartisans/streamingledger/sdk/api/StateNotReadableException.java deleted file mode 100644 index 7bbd8d5..0000000 --- a/da-streamingledger-sdk/src/main/java/com/dataartisans/streamingledger/sdk/api/StateNotReadableException.java +++ /dev/null @@ -1,31 +0,0 @@ -/* - * Copyright 2018 Data Artisans GmbH - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.dataartisans.streamingledger.sdk.api; - -/** - * An exception that indicates that state was accessed for read, without being - * declared as readable. - */ -public class StateNotReadableException extends StateAccessException { - - private static final long serialVersionUID = 1L; - - public StateNotReadableException(StateAccess state) { - super(String.format("State access %s for state %s has not declared read access.", - state.getStateAccessName(), state.getStateName())); - } -} diff --git a/da-streamingledger-sdk/src/main/java/com/dataartisans/streamingledger/sdk/api/StateNotWritableException.java b/da-streamingledger-sdk/src/main/java/com/dataartisans/streamingledger/sdk/api/StateNotWritableException.java deleted file mode 100644 index 7e16832..0000000 --- a/da-streamingledger-sdk/src/main/java/com/dataartisans/streamingledger/sdk/api/StateNotWritableException.java +++ /dev/null @@ -1,31 +0,0 @@ -/* - * Copyright 2018 Data Artisans GmbH - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.dataartisans.streamingledger.sdk.api; - -/** - * An exception that indicates that state was accessed for write, without being - * declared as writable. - */ -public class StateNotWritableException extends StateAccessException { - - private static final long serialVersionUID = 1L; - - public StateNotWritableException(StateAccess state) { - super(String.format("State access %s for state %s has not declared write access.", - state.getStateAccessName(), state.getStateName())); - } -} diff --git a/da-streamingledger-sdk/src/main/java/com/dataartisans/streamingledger/sdk/api/StreamingLedger.java b/da-streamingledger-sdk/src/main/java/com/dataartisans/streamingledger/sdk/api/StreamingLedger.java deleted file mode 100644 index 9412c3f..0000000 --- a/da-streamingledger-sdk/src/main/java/com/dataartisans/streamingledger/sdk/api/StreamingLedger.java +++ /dev/null @@ -1,487 +0,0 @@ -/* - * Copyright 2018 Data Artisans GmbH - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.dataartisans.streamingledger.sdk.api; - -import org.apache.flink.api.common.typeinfo.TypeHint; -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.java.functions.KeySelector; -import org.apache.flink.api.java.typeutils.ResultTypeQueryable; -import org.apache.flink.api.java.typeutils.TypeExtractor; -import org.apache.flink.streaming.api.datastream.DataStream; -import org.apache.flink.util.OutputTag; - -import com.dataartisans.streamingledger.sdk.spi.InputAndSpec; -import com.dataartisans.streamingledger.sdk.spi.StreamingLedgerRuntimeLoader; -import com.dataartisans.streamingledger.sdk.spi.StreamingLedgerRuntimeProvider; -import com.dataartisans.streamingledger.sdk.spi.StreamingLedgerSpec; -import com.dataartisans.streamingledger.sdk.spi.StreamingLedgerSpecFactory; - -import java.io.Serializable; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; - -import javax.annotation.Nullable; - -import static java.util.Objects.requireNonNull; - -/** - * StreamingLedger is a builder for a transactional operation on state involving multiple keys. - */ -@SuppressWarnings({"WeakerAccess", "unused"}) -public final class StreamingLedger { - - /** - * a mapping between a stream name and a {@link InternalStreamBuilder}. - */ - private final Map> streamBuilders = new HashMap<>(); - - /** - * declared {@link State} names. - */ - private final Set stateNames = new HashSet<>(); - - /** - * the name of this streaming ledger. - */ - private final String name; - - /** - * Holds the translation result. - */ - private ResultStreams resultStreams; - - /** - * Private constructor, instantiation should go through {@link #create(String)}. - */ - public StreamingLedger(String name) { - this.name = requireNonNull(name); - } - - // ------------------------------------------------------------------------------------------- - // Public API - // ------------------------------------------------------------------------------------------- - - public static StreamingLedger create(String transactionalStreamsName) { - return new StreamingLedger(transactionalStreamsName); - } - - private static List> specsFromBuilders(Map> builders) { - List> inputAndSpecs = new ArrayList<>(); - - builders.forEach((streamName, builder) -> { - - @SuppressWarnings({"unchecked", "raw"}) final InputAndSpec inputAndSpec = new InputAndSpec( - builder.getInputStream(), - streamName, - builder.buildTransactionalSpec() - ); - - inputAndSpecs.add(inputAndSpec); - }); - - return inputAndSpecs; - } - - /** - * Creates a {@link StateBuilder} for {@link State} with a given name. - * - * @param stateName a name to associate with this state. - */ - public StateBuilder declareState(String stateName) { - requireNonNull(stateName); - checkNotTranslated(); - - if (!stateNames.add(stateName)) { - throw new IllegalArgumentException("A state named '" + stateName + "' is already defined."); - } - return new StateBuilder(stateName); - } - - /** - * Creates a new streaming ledger operation for the events of the given data stream. -` * of a streaming ledger state operation. - * - * @param inputStream The data stream with the transaction data events. - * @param name The unique name for this operation, used for state matching during upgrades. - * @param The type of the transaction data events. - * @return An instance of the builder for the - */ - public StreamBuilder usingStream(DataStream inputStream, String name) { - requireNonNull(inputStream); - requireNonNull(name); - checkNotTranslated(); - - InternalStreamBuilder builder = new InternalStreamBuilder<>(inputStream, name); - if (streamBuilders.containsKey(name)) { - throw new IllegalArgumentException("stream named '" + name + "' already exists."); - } - streamBuilders.put(name, builder); - return new StreamBuilder<>(builder); - } - - /** - * Constructs a {@link ResultStreams} as defined. - */ - public ResultStreams resultStreams() { - if (resultStreams == null) { - List> specs = specsFromBuilders(streamBuilders); - StreamingLedgerRuntimeProvider runtimeProvider = StreamingLedgerRuntimeLoader.getRuntimeProvider(); - resultStreams = runtimeProvider.translate(name, specs); - } - return resultStreams; - } - - // ------------------------------------------------------------------------------------------- - // Nested Classes - // ------------------------------------------------------------------------------------------- - - private void checkNotTranslated() { - if (resultStreams != null) { - throw new IllegalStateException("Can't be called after calling getResultStream()."); - } - } - - /** - * A specification of a particular state access that happens as part of a transaction. - * - * @param The type of the transaction data event. - * @param The type of the key in this state access. - * @param The type of the value stored in the state. - */ - public static class StateAccessSpec implements Serializable { - - private static final long serialVersionUID = 1; - - /** - * The name under which this specific state access is passed to the {@link TransactionProcessFunction}'s - * process method. - */ - public final String bindName; - - /** - * The streaming ledger state to access. - */ - public final State state; - - /** - * The function that gets the key for this state access from the transaction data event. - */ - public final KeySelector keyAccess; - - /** - * The access type, whether state is only read, only written, or both. - */ - public final AccessType accessType; - - /** - * Creates a new StateAccessSpec. - * - * @param bindName The name under which this specific state access is passed to the - * {@link TransactionProcessFunction}'s process method. - * @param state The streaming ledger state to access. - * @param keyAccess The function that gets the key for this state access from the - * transaction data event. - * @param accessType The access type, whether state is only read, only written, or both. - */ - public StateAccessSpec( - String bindName, - State state, - KeySelector keyAccess, - AccessType accessType) { - this.bindName = requireNonNull(bindName); - this.state = requireNonNull(state); - this.keyAccess = requireNonNull(keyAccess); - this.accessType = requireNonNull(accessType); - } - } - - /** - * An abstract description of a streaming ledger transactional state maintained by Flink - * in a transactional scope (as built via {@link StreamingLedger}. - * - * @param The type of the key that accesses the state. - * @param The type of the value stored in the state. - */ - public static final class State implements Serializable { - - private static final long serialVersionUID = 1; - - private final String name; - private final TypeInformation keyType; - private final TypeInformation valueType; - - public State(String name, TypeInformation keyType, TypeInformation valueType) { - this.name = requireNonNull(name); - this.keyType = requireNonNull(keyType); - this.valueType = requireNonNull(valueType); - } - - // ------------------------------------------------------------------------ - // Properties - // ------------------------------------------------------------------------ - - public String getName() { - return name; - } - - public TypeInformation getKeyType() { - return keyType; - } - - public TypeInformation getValueType() { - return valueType; - } - - @Override - public String toString() { - return "State (" + name + ')'; - } - } - - // ------------------------------------------------------------------------------------------- - // State Builder - // ------------------------------------------------------------------------------------------- - - /** - * An {@code ResultStreams} of a transactional stream. - */ - public static final class ResultStreams { - - private final Map> resultStreams; - - public ResultStreams(Map> resultStreams) { - this.resultStreams = resultStreams; - } - - public Iterable> getResultStreams() { - return resultStreams.values(); - } - - public DataStream getResultStream(OutputTag output) { - requireNonNull(output); - @SuppressWarnings("unchecked") DataStream stream = (DataStream) - resultStreams.get(output.getId()); - if (stream == null) { - throw new IllegalArgumentException("unknown stream named '" + output.getId() + "'."); - } - return stream; - } - } - - /** - * A {@link State} builder. - */ - public static final class StateBuilder { - private final String name; - - StateBuilder(String name) { - this.name = requireNonNull(name); - } - - public StateBuilderWithKeyType withKeyType(Class keyType) { - return withKeyType(TypeInformation.of(keyType)); - } - - public StateBuilderWithKeyType withKeyType(TypeHint keyType) { - return withKeyType(TypeInformation.of(keyType)); - } - - public StateBuilderWithKeyType withKeyType(TypeInformation keyType) { - return new StateBuilderWithKeyType<>(name, keyType); - } - } - - // ------------------------------------------------------------------------------------------- - // Stream Builder - // ------------------------------------------------------------------------------------------- - - /** - * A {@link State} builder with a {@code name} and {@code keyType}. - * - * @param key type - */ - public static final class StateBuilderWithKeyType { - private final String name; - private final TypeInformation keyType; - - StateBuilderWithKeyType(String name, TypeInformation keyType) { - this.name = requireNonNull(name); - this.keyType = requireNonNull(keyType); - } - - public State withValueType(Class valueType) { - return new State<>(name, keyType, TypeInformation.of(valueType)); - } - - public State valueType(TypeHint valueType) { - return new State<>(name, keyType, TypeInformation.of(valueType)); - } - - public State valueType(TypeInformation valueType) { - return new State<>(name, keyType, valueType); - } - } - - /** - * Internal builder that is created per input stream, and holds the necessary information gathered - * at different phase of {@link StreamingLedger#usingStream(DataStream, String)}. - */ - @SuppressWarnings("UnusedReturnValue") - private static final class InternalStreamBuilder { - - /** - * All state accesses (read, write, read-write) that the transaction processing requires. - */ - private final List> accesses; - - /** - * The input stream with the transaction data events. - */ - private final DataStream inputStream; - - /** - * The name associated with this stream. - */ - private final String name; - - /** - * Would be set by {@link #withProcessFunction(TransactionProcessFunction)}. - */ - @Nullable - private TransactionProcessFunction processFunction; - - /** - * Would be deduced by {@link #withProcessFunction(TransactionProcessFunction)}. - */ - @Nullable - private TypeInformation resultType; - - InternalStreamBuilder(DataStream inputStream, String name) { - this.inputStream = requireNonNull(inputStream); - this.accesses = new ArrayList<>(); - this.name = requireNonNull(name); - } - - /** - * deduce the result type of the supplied {@link TransactionProcessFunction}. - */ - private static TypeInformation deduceResultType( - TransactionProcessFunction processor, - TypeInformation inputType) { - - if (processor instanceof ResultTypeQueryable) { - @SuppressWarnings("unchecked") - TypeInformation producedType = ((ResultTypeQueryable) processor).getProducedType(); - return producedType; - } - return TypeExtractor.createTypeInfo( - TransactionProcessFunction.class, - processor.getClass(), - 1, - inputType, - null); - } - - InternalStreamBuilder withProcessFunction(TransactionProcessFunction processor) { - this.processFunction = requireNonNull(processor); - this.resultType = deduceResultType(processor, inputStream.getType()); - return this; - } - - InternalStreamBuilder withAccess(StateAccessSpec spec) { - accesses.add(spec); - return this; - } - - StreamingLedgerSpec buildTransactionalSpec() { - if (processFunction == null || resultType == null) { - throw new IllegalStateException("A TransactionProcessFunction is missing."); - } - return StreamingLedgerSpecFactory.create(processFunction, accesses, inputStream.getType(), resultType); - } - - DataStream getInputStream() { - return inputStream; - } - } - - /** - * A fluent builder that is used to define all the aspects of a transactional stream. - * - * @param input stream data type - */ - public static final class StreamBuilder { - private final InternalStreamBuilder builder; - - StreamBuilder(InternalStreamBuilder streamBuilder) { - this.builder = streamBuilder; - } - - public StreamBuilderWithProcessFunction apply(TransactionProcessFunction fn) { - @SuppressWarnings("unchecked") - InternalStreamBuilder b = (InternalStreamBuilder) builder; - return new StreamBuilderWithProcessFunction<>(b.withProcessFunction(fn)); - } - } - - /** - * A fluent builder that is used to define all the aspects of a transactional stream. - * - * @param input stream data type - * @param output stream data type - */ - public static final class StreamBuilderWithProcessFunction { - private final InternalStreamBuilder builder; - - StreamBuilderWithProcessFunction(InternalStreamBuilder builder) { - this.builder = builder; - } - - /** - * Adds another state access to the transactional operation. A state access is needed for - * each "row" or "entry" in the state that the transactional operation wants to modify. - * - *

For example, if the transactional operation modifies two entries in a streaming ledger state, - * two state accesses must be specified. - * - * @param bindName The name under which this specific state access is passed to the - * {@link TransactionProcessFunction}'s process method. - * @param state The streaming ledger state to access. - * @param keyAccess The function that gets the key for this state access from the - * transaction data event. - * @param accessType The access type, whether state is only read, only written, or both. - * @return This instance, to allow function chaining. - */ - public StreamBuilderWithProcessFunction on( - State state, - KeySelector keyAccess, - String bindName, - AccessType accessType) { - builder.withAccess(new StateAccessSpec<>(bindName, state, keyAccess, accessType)); - return this; - } - - public OutputTag output() { - TypeInformation outType = requireNonNull(builder.resultType); - return new OutputTag<>(builder.name, outType); - } - } -} diff --git a/da-streamingledger-sdk/src/main/java/com/dataartisans/streamingledger/sdk/api/TransactionProcessFunction.java b/da-streamingledger-sdk/src/main/java/com/dataartisans/streamingledger/sdk/api/TransactionProcessFunction.java deleted file mode 100644 index 3eef8a5..0000000 --- a/da-streamingledger-sdk/src/main/java/com/dataartisans/streamingledger/sdk/api/TransactionProcessFunction.java +++ /dev/null @@ -1,90 +0,0 @@ -/* - * Copyright 2018 Data Artisans GmbH - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.dataartisans.streamingledger.sdk.api; - -import org.apache.flink.api.common.functions.Function; - -import java.lang.annotation.ElementType; -import java.lang.annotation.Retention; -import java.lang.annotation.RetentionPolicy; -import java.lang.annotation.Target; - -/** - * TransactionProcessFunction. - * - * @param The type of the transaction input events. - * @param The type of the transaction result events. - */ -public abstract class TransactionProcessFunction implements Function { - - // stable serialVersionUID for compatibility across code updates - private static final long serialVersionUID = 1L; - - // ------------------------------------------------------------------------ - - /** - * This interface marks the so called {@code 'process method'}, which processes the - * transaction with the transaction event and the involved states. - * - *

This interface is declared within the TransactionProcessFunction class so that - * users do not need to import anything when writing their process method. - */ - @Target(ElementType.METHOD) - @Retention(RetentionPolicy.RUNTIME) - public @interface ProcessTransaction { - } - - // ------------------------------------------------------------------------ - - /** - * Annotation for state access parameters in the process method. - * - *

This interface is declared within the TransactionProcessFunction class so that - * users do not need to import anything when writing their process method. - */ - @Target(ElementType.PARAMETER) - @Retention(RetentionPolicy.RUNTIME) - public @interface State { - - /** - * The value is the name of the state access. - */ - String value(); - - } - - // ------------------------------------------------------------------------ - - /** - * Transaction Processing Context. - */ - public interface Context { - - /** - * Emits a transaction result. - */ - void emit(T record); - - /** - * Abort the current transaction. - * - *

Calling abort will undo any change made to a {@link StateAccess}, and will not cause a transaction result - * to be emitted. - */ - void abort(); - } -} diff --git a/da-streamingledger-sdk/src/main/java/com/dataartisans/streamingledger/sdk/api/package-info.java b/da-streamingledger-sdk/src/main/java/com/dataartisans/streamingledger/sdk/api/package-info.java deleted file mode 100644 index 4726505..0000000 --- a/da-streamingledger-sdk/src/main/java/com/dataartisans/streamingledger/sdk/api/package-info.java +++ /dev/null @@ -1,22 +0,0 @@ -/* - * Copyright 2018 Data Artisans GmbH - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/** - * The API package for the dA-StreamingLedger support - * for Apache Flink. - */ - -package com.dataartisans.streamingledger.sdk.api; diff --git a/da-streamingledger-sdk/src/main/java/com/dataartisans/streamingledger/sdk/common/reflection/ByteBuddyProcessFunctionInvoker.java b/da-streamingledger-sdk/src/main/java/com/dataartisans/streamingledger/sdk/common/reflection/ByteBuddyProcessFunctionInvoker.java deleted file mode 100644 index 18d7f0e..0000000 --- a/da-streamingledger-sdk/src/main/java/com/dataartisans/streamingledger/sdk/common/reflection/ByteBuddyProcessFunctionInvoker.java +++ /dev/null @@ -1,222 +0,0 @@ -/* - * Copyright 2018 Data Artisans GmbH - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.dataartisans.streamingledger.sdk.common.reflection; - -import com.dataartisans.streamingledger.sdk.api.TransactionProcessFunction; -import com.dataartisans.streamingledger.sdk.api.TransactionProcessFunction.ProcessTransaction; -import com.dataartisans.streamingledger.sdk.spi.StreamingLedgerSpec; -import net.bytebuddy.ByteBuddy; -import net.bytebuddy.NamingStrategy; -import net.bytebuddy.TypeCache; -import net.bytebuddy.TypeCache.Sort; -import net.bytebuddy.description.method.MethodDescription; -import net.bytebuddy.description.method.MethodDescription.ForLoadedConstructor; -import net.bytebuddy.description.method.MethodDescription.ForLoadedMethod; -import net.bytebuddy.description.modifier.FieldManifestation; -import net.bytebuddy.description.modifier.Visibility; -import net.bytebuddy.description.type.TypeDefinition; -import net.bytebuddy.description.type.TypeDescription; -import net.bytebuddy.description.type.TypeDescription.Generic; -import net.bytebuddy.dynamic.DynamicType; -import net.bytebuddy.dynamic.DynamicType.Builder; -import net.bytebuddy.dynamic.DynamicType.Unloaded; -import net.bytebuddy.dynamic.loading.ClassLoadingStrategy.Default; -import net.bytebuddy.dynamic.scaffold.subclass.ConstructorStrategy; -import net.bytebuddy.implementation.FieldAccessor; -import net.bytebuddy.implementation.MethodCall; -import net.bytebuddy.implementation.bytecode.assign.Assigner; -import net.bytebuddy.implementation.bytecode.assign.Assigner.Typing; -import net.bytebuddy.matcher.ElementMatchers; - -import java.lang.reflect.Constructor; -import java.lang.reflect.InvocationTargetException; -import java.lang.reflect.Method; -import java.lang.reflect.Modifier; -import java.util.Iterator; -import java.util.concurrent.atomic.AtomicLong; - -import static java.util.Objects.requireNonNull; - -/** - * A {@link ByteBuddy} backed factory of {@link ProcessFunctionInvoker}. - */ -public class ByteBuddyProcessFunctionInvoker { - - private static final TypeCache> CACHE = new TypeCache<>(Sort.SOFT); - - public static ProcessFunctionInvoker create(StreamingLedgerSpec spec) { - final ClassLoader userClassLoader = classLoader(spec); - final Class generatedClass = CACHE.findOrInsert(userClassLoader, spec, () -> generateAndLoadClass(spec)); - - try { - return createInstance(generatedClass, spec); - } - catch (NoSuchMethodException | IllegalAccessException | InstantiationException | InvocationTargetException e) { - throw new RuntimeException("Unable to create a new instance for " + spec.processFunction, e); - } - } - - private static Class> generateAndLoadClass( - StreamingLedgerSpec spec) throws NoSuchMethodException { - Unloaded unloaded = createDynamicTypeFromSpec(spec); - return loadClass(unloaded, classLoader(spec)); - } - - private static ProcessFunctionInvoker createInstance( - Class generatedClass, - StreamingLedgerSpec spec) - throws NoSuchMethodException, IllegalAccessException, InvocationTargetException, InstantiationException { - - Constructor constructor = generatedClass.getDeclaredConstructor(spec.processFunction.getClass()); - constructor.setAccessible(true); - - @SuppressWarnings("unchecked") - ProcessFunctionInvoker instance = (ProcessFunctionInvoker) - constructor.newInstance(spec.processFunction); - - return instance; - } - - private static DynamicType.Unloaded createDynamicTypeFromSpec(StreamingLedgerSpec spec) - throws NoSuchMethodException { - PackageLocalNamingStrategy generatedTypeName = new PackageLocalNamingStrategy(spec.processFunction.getClass()); - - TypeDefinition generatedType = Generic.Builder.parameterizedType( - ProcessFunctionInvoker.class, - spec.inputType.getTypeClass(), - spec.resultType.getTypeClass() - ).build(); - - TypeDefinition processFunctionType = new TypeDescription.ForLoadedType(spec.processFunction.getClass()); - ForLoadedConstructor superTypeConstructor = - new ForLoadedConstructor(ProcessFunctionInvoker.class.getDeclaredConstructor()); - MethodDescription processMethodType = processMethodTypeFromSpec(spec); - - Builder builder = configureByteBuddyBuilder( - generatedTypeName, - generatedType, - processFunctionType, - superTypeConstructor, - processMethodType, - spec.stateBindings.size()); - - return builder.make(); - } - - private static Builder configureByteBuddyBuilder( - PackageLocalNamingStrategy generatedTypeName, - TypeDefinition generatedType, - TypeDefinition processFunctionType, - ForLoadedConstructor superTypeConstructor, - MethodDescription processMethodType, - int numberOfStateBindings) { - - return new ByteBuddy() - // final class extends { - .with(generatedTypeName) - .subclass(generatedType, ConstructorStrategy.Default.NO_CONSTRUCTORS).modifiers(Modifier.FINAL) - // private final delegate; - .defineField("delegate", processFunctionType, Visibility.PRIVATE, FieldManifestation.FINAL) - // public ( delegate) { - // super(); - // this.delegate = delegate; - // } - .defineConstructor(Modifier.PUBLIC) - .withParameters(processFunctionType) - .intercept(MethodCall.invoke(superTypeConstructor) - .andThen(FieldAccessor.ofField("delegate").setsArgumentAt(0)) - ) - // invoke(input, context, StateAccess[] arguments) { - // this.delegate.invoke(input, context, arguments[0], arguments[1], .. arguments[n - 1]); - // } - .method(ElementMatchers.named("invoke")) - .intercept(MethodCall.invoke(processMethodType) - .onField("delegate") - .withArgument(0, 1) // event & context - .withArgumentArrayElements(2, numberOfStateBindings) // StateAccess - .withAssigner(Assigner.DEFAULT, Typing.STATIC) - ); - } - - private static MethodDescription processMethodTypeFromSpec(StreamingLedgerSpec spec) { - Class processClass = spec.processFunction.getClass(); - Iterator methods = Methods.findAnnotatedMethods(processClass, ProcessTransaction.class); - if (!methods.hasNext()) { - throw new IllegalArgumentException("Unable to find an annotated method on " + processClass.getSimpleName()); - } - Method method = methods.next(); - if (methods.hasNext()) { - throw new IllegalArgumentException( - "Was expecting a single method annotated with a ProcessTransaction, but found more."); - } - return new ForLoadedMethod(method); - } - - @SuppressWarnings("unchecked") - private static Class> loadClass( - DynamicType.Unloaded unloaded, - ClassLoader classLoader) { - return (Class>) - unloaded - .load(classLoader, Default.INJECTION) - .getLoaded(); - } - - private static ClassLoader classLoader(StreamingLedgerSpec spec) { - final Class userClass = spec.processFunction.getClass(); - return userClass.getClassLoader(); - } - - /** - * A naming strategy for generated classes. - * - *

The following name format is produced: user-package . superClass $ user-className $ sequence number - */ - private static final class PackageLocalNamingStrategy extends NamingStrategy.AbstractBase { - private static final AtomicLong sequence = new AtomicLong(); - private final String packageName; - private final String className; - - PackageLocalNamingStrategy(Class superType) { - requireNonNull(superType); - Package aPackage = superType.getPackage(); - if (aPackage == null) { - this.packageName = ""; - } - else { - this.packageName = aPackage.getName(); - } - this.className = superType.getSimpleName(); - } - - @Override - protected String name(TypeDescription superClass) { - StringBuilder sb = new StringBuilder(); - if (!packageName.isEmpty()) { - sb.append(packageName); - sb.append('.'); - } - sb.append(superClass.getSimpleName()); - sb.append("$"); - sb.append(className); - sb.append("$"); - sb.append(sequence.incrementAndGet()); - return sb.toString(); - } - } - -} diff --git a/da-streamingledger-sdk/src/main/java/com/dataartisans/streamingledger/sdk/common/reflection/Methods.java b/da-streamingledger-sdk/src/main/java/com/dataartisans/streamingledger/sdk/common/reflection/Methods.java deleted file mode 100644 index 458407a..0000000 --- a/da-streamingledger-sdk/src/main/java/com/dataartisans/streamingledger/sdk/common/reflection/Methods.java +++ /dev/null @@ -1,55 +0,0 @@ -/* - * Copyright 2018 Data Artisans GmbH - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.dataartisans.streamingledger.sdk.common.reflection; - -import java.lang.annotation.Annotation; -import java.lang.reflect.Method; -import java.util.Arrays; -import java.util.Iterator; -import java.util.stream.Stream; - -/** - * Reflection utility functions. - */ -public final class Methods { - private Methods() { - } - - /** - * Finds all the {@link Method}s that are annotated with the supplied annotation. - * - *

This method will traverse up the superclass hierarchy looking for methods annotated with the supplied - * annotation. - * - * @return an iterator of {@link Method}s found. - */ - public static Iterator findAnnotatedMethods(Class javaClass, Class annotation) { - return definedMethods(javaClass) - .filter(method -> method.getAnnotation(annotation) != null) - .iterator(); - } - - private static Stream definedMethods(Class javaClass) { - if (javaClass == null || javaClass == Object.class) { - return Stream.empty(); - } - Stream selfMethods = Arrays.stream(javaClass.getDeclaredMethods()); - Stream superMethods = definedMethods(javaClass.getSuperclass()); - - return Stream.concat(selfMethods, superMethods); - } -} diff --git a/da-streamingledger-sdk/src/main/java/com/dataartisans/streamingledger/sdk/common/reflection/ProcessFunctionInvoker.java b/da-streamingledger-sdk/src/main/java/com/dataartisans/streamingledger/sdk/common/reflection/ProcessFunctionInvoker.java deleted file mode 100644 index 1bf1fc4..0000000 --- a/da-streamingledger-sdk/src/main/java/com/dataartisans/streamingledger/sdk/common/reflection/ProcessFunctionInvoker.java +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Copyright 2018 Data Artisans GmbH - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.dataartisans.streamingledger.sdk.common.reflection; - -import com.dataartisans.streamingledger.sdk.api.StateAccess; -import com.dataartisans.streamingledger.sdk.api.TransactionProcessFunction; -import com.dataartisans.streamingledger.sdk.api.TransactionProcessFunction.Context; - -/** - * An auto-generated class that invokes a user provided {@link TransactionProcessFunction}. - * - * @param transaction event type. - * @param transaction result type. - */ -public abstract class ProcessFunctionInvoker { - - public abstract void invoke(InT input, Context context, StateAccess[] arguments); -} diff --git a/da-streamingledger-sdk/src/main/java/com/dataartisans/streamingledger/sdk/common/union/TaggedElement.java b/da-streamingledger-sdk/src/main/java/com/dataartisans/streamingledger/sdk/common/union/TaggedElement.java deleted file mode 100644 index 5cea202..0000000 --- a/da-streamingledger-sdk/src/main/java/com/dataartisans/streamingledger/sdk/common/union/TaggedElement.java +++ /dev/null @@ -1,82 +0,0 @@ -/* - * Copyright 2018 Data Artisans GmbH - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.dataartisans.streamingledger.sdk.common.union; - -import java.util.List; -import java.util.Objects; - -/** - * A simple POJO with a dataStreamTag and an element. - * - *

see {@link Union#apply(List)}. - */ -public final class TaggedElement { - - public static final int UNDEFINED_TAG = -1; - - /** - * an index of the original data stream before the union. - */ - private int dataStreamTag; - - /** - * an element from one of the original data streams. - */ - private Object element; - - public TaggedElement(int dataStreamTag, Object element) { - this.dataStreamTag = dataStreamTag; - this.element = element; - } - - public int getDataStreamTag() { - return dataStreamTag; - } - - public void setDataStreamTag(int dataStreamTag) { - this.dataStreamTag = dataStreamTag; - } - - public Object getElement() { - return element; - } - - public void setElement(Object element) { - this.element = element; - } - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - TaggedElement that = (TaggedElement) o; - return dataStreamTag == that.dataStreamTag - && Objects.equals(element, that.element); - } - - @Override - public int hashCode() { - int result = 1; - result = 31 * result + (element == null ? 0 : element.hashCode()); - result = 31 * result + Integer.hashCode(dataStreamTag); - return result; - } -} diff --git a/da-streamingledger-sdk/src/main/java/com/dataartisans/streamingledger/sdk/common/union/Union.java b/da-streamingledger-sdk/src/main/java/com/dataartisans/streamingledger/sdk/common/union/Union.java deleted file mode 100644 index 71c667b..0000000 --- a/da-streamingledger-sdk/src/main/java/com/dataartisans/streamingledger/sdk/common/union/Union.java +++ /dev/null @@ -1,120 +0,0 @@ -/* - * Copyright 2018 Data Artisans GmbH - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.dataartisans.streamingledger.sdk.common.union; - -import org.apache.flink.api.common.functions.RichMapFunction; -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.streaming.api.datastream.DataStream; - -import java.util.ArrayList; -import java.util.List; -import java.util.stream.Collectors; - -import static org.apache.flink.util.Preconditions.checkArgument; - -/** - * Union differently typed {@link DataStream}s. - */ -public final class Union { - - private Union() { - } - - /** - * Union differently typed {@link DataStream}s into single {@code DataStream}. - * - *

The resulting {@code DataStream} is of type {@link TaggedElement} where - * {@link TaggedElement#getDataStreamTag()} corresponds to the list position of the source {@code DataStream} in - * {@code inputs} that produced that element, and {@link TaggedElement#getElement()} is the element produced. - * - * @param inputs the input data streams to union. - * @return a {@code DataStream} that corresponds to the union of all the input {@link DataStream}s - */ - public static DataStream apply(List> inputs) { - checkArgument(!inputs.isEmpty(), "union requires at least one input data stream."); - - List> taggedInputs = tagInputStreams(inputs); - if (taggedInputs.size() == 1) { - return taggedInputs.get(0); - } - DataStream first = taggedInputs.get(0); - List> restList = taggedInputs.subList(1, taggedInputs.size()); - - @SuppressWarnings({"unchecked", "raw"}) - DataStream[] restArray = (DataStream[]) new DataStream[restList.size()]; - DataStream[] rest = restList.toArray(restArray); - return first.union(rest); - } - - // ------------------------------------------------------------------------------------------------------- - // Internal Helpers - // ------------------------------------------------------------------------------------------------------- - - private static List> tagInputStreams(List> inputs) { - TypeInformation typeInfo = createUnionTypeInfo(inputs); - - List> taggedInputs = new ArrayList<>(); - int dataStreamIndex = 0; - for (DataStream input : inputs) { - - final DataStream transformed = input - .map(new TaggingMap<>(dataStreamIndex)) - .returns(typeInfo); - - dataStreamIndex++; - taggedInputs.add(transformed); - } - return taggedInputs; - } - - private static UnionTypeInfo createUnionTypeInfo(List> inputs) { - List> underlyingTypes = inputs.stream() - .map(DataStream::getType) - .collect(Collectors.toList()); - - return new UnionTypeInfo(underlyingTypes); - } - - // ------------------------------------------------------------------------------------------------------- - // Nested Class - // ------------------------------------------------------------------------------------------------------- - - private static final class TaggingMap extends RichMapFunction { - - private static final long serialVersionUID = 1; - - private final int dataStreamIndex; - private transient TaggedElement union; - - TaggingMap(int dataStreamIndex) { - this.dataStreamIndex = dataStreamIndex; - } - - @Override - public TaggedElement map(InT element) { - union.setElement(element); - return union; - } - - @Override - public void open(Configuration parameters) throws Exception { - super.open(parameters); - this.union = new TaggedElement(this.dataStreamIndex, null); - } - } -} diff --git a/da-streamingledger-sdk/src/main/java/com/dataartisans/streamingledger/sdk/common/union/UnionSerializer.java b/da-streamingledger-sdk/src/main/java/com/dataartisans/streamingledger/sdk/common/union/UnionSerializer.java deleted file mode 100644 index 112adc8..0000000 --- a/da-streamingledger-sdk/src/main/java/com/dataartisans/streamingledger/sdk/common/union/UnionSerializer.java +++ /dev/null @@ -1,231 +0,0 @@ -/* - * Copyright 2018 Data Artisans GmbH - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.dataartisans.streamingledger.sdk.common.union; - -import org.apache.flink.api.common.typeutils.CompatibilityResult; -import org.apache.flink.api.common.typeutils.CompatibilityUtil; -import org.apache.flink.api.common.typeutils.TypeDeserializer; -import org.apache.flink.api.common.typeutils.TypeDeserializerAdapter; -import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot; -import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.core.memory.DataInputView; -import org.apache.flink.core.memory.DataOutputView; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; - -import static java.util.Objects.requireNonNull; -import static org.apache.flink.api.common.typeutils.CompatibilityResult.requiresMigration; -import static org.apache.flink.shaded.curator.org.apache.curator.shaded.com.google.common.base.Preconditions.checkArgument; - -final class UnionSerializer extends TypeSerializer { - - private static final long serialVersionUID = 1; - - private final TypeSerializer[] underlyingSerializers; - - private transient Object[] reusableObjects; - - UnionSerializer(List> underlyingSerializers) { - requireNonNull(underlyingSerializers); - checkArgument(!underlyingSerializers.isEmpty(), "At least one underlying serializer is needed."); - this.underlyingSerializers = toArray(underlyingSerializers); - this.reusableObjects = createReusableObjects(this.underlyingSerializers); - } - - private static Object[] createReusableObjects(TypeSerializer[] underlyingSerializers) { - Object[] reusableObjects = new Object[underlyingSerializers.length]; - for (int i = 0; i < reusableObjects.length; i++) { - reusableObjects[i] = underlyingSerializers[i].createInstance(); - } - return reusableObjects; - } - - @SuppressWarnings("unchecked") - private static TypeSerializer[] toArray(List> underlyingSerializers) { - return underlyingSerializers.toArray(new TypeSerializer[0]); - } - - @Override - public boolean isImmutableType() { - for (TypeSerializer serializer : underlyingSerializers) { - if (!serializer.isImmutableType()) { - return false; - } - } - return true; - } - - @Override - public TypeSerializer duplicate() { - List> duplicates = new ArrayList<>(underlyingSerializers.length); - boolean stateful = false; - for (TypeSerializer serializer : underlyingSerializers) { - TypeSerializer duplicate = serializer.duplicate(); - if (duplicate != serializer) { - stateful = true; - } - duplicates.add(duplicate); - } - if (!stateful) { - return this; - } - return new UnionSerializer(duplicates); - } - - @Override - public TaggedElement createInstance() { - return new TaggedElement(TaggedElement.UNDEFINED_TAG, null); - } - - @Override - public TaggedElement copy(TaggedElement from) { - final int tag = from.getDataStreamTag(); - Object copyOf = underlyingSerializers[tag].copy(from.getElement()); - return new TaggedElement(from.getDataStreamTag(), copyOf); - } - - @Override - public TaggedElement copy(TaggedElement from, TaggedElement reuse) { - final int tag = from.getDataStreamTag(); - final TypeSerializer serializer = underlyingSerializers[tag]; - final Object elementCopy = serializer.copy(from.getElement(), reusableObjects[tag]); - - reuse.setElement(elementCopy); - reuse.setDataStreamTag(tag); - return reuse; - } - - @Override - public int getLength() { - return -1; - } - - @Override - public void serialize(TaggedElement record, DataOutputView target) throws IOException { - final int tag = record.getDataStreamTag(); - target.writeInt(tag); - underlyingSerializers[tag].serialize(record.getElement(), target); - } - - @Override - public TaggedElement deserialize(DataInputView source) throws IOException { - final int tag = source.readInt(); - Object value = underlyingSerializers[tag].deserialize(source); - return new TaggedElement(tag, value); - } - - @Override - public TaggedElement deserialize(TaggedElement reuse, DataInputView source) throws IOException { - final int tag = source.readInt(); - final TypeSerializer serializer = underlyingSerializers[tag]; - final Object element = serializer.deserialize(reusableObjects[tag], source); - - reuse.setDataStreamTag(tag); - reuse.setElement(element); - return reuse; - } - - @Override - public void copy(DataInputView source, DataOutputView target) throws IOException { - final int tag = source.readInt(); - target.writeInt(tag); - underlyingSerializers[tag].copy(source, target); - } - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - UnionSerializer that = (UnionSerializer) o; - return Arrays.equals(underlyingSerializers, that.underlyingSerializers); - } - - @Override - public boolean canEqual(Object obj) { - return obj instanceof UnionSerializer; - } - - @Override - public int hashCode() { - return Arrays.hashCode(underlyingSerializers); - } - - // ----------------------------------------------------------------------------------- - // Internal helper methods - // ----------------------------------------------------------------------------------- - - @Override - public TypeSerializerConfigSnapshot snapshotConfiguration() { - return new UnionSerializerConfigSnapshot(Arrays.asList(underlyingSerializers)); - } - - @Override - public CompatibilityResult ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) { - if (!(configSnapshot instanceof UnionSerializerConfigSnapshot)) { - return requiresMigration(); - } - UnionSerializerConfigSnapshot config = (UnionSerializerConfigSnapshot) configSnapshot; - List, TypeSerializerConfigSnapshot>> previousSerializersAndConfigs = - config.getNestedSerializersAndConfigs(); - - if (previousSerializersAndConfigs.size() != underlyingSerializers.length) { - return requiresMigration(); - } - - List> migratedSerializers = new ArrayList<>(); - // Adopted from the EitherSerializerConfigSnapshot. - // One (or more) of the serializers here is not compatible with one (or more) of the previous serializers. - boolean requiresMigration = false; - for (int i = 0; i < underlyingSerializers.length; i++) { - final TypeSerializer underlyingSerializer = underlyingSerializers[i]; - - final CompatibilityResult res = CompatibilityUtil.resolveCompatibilityResult( - previousSerializersAndConfigs.get(i).f0, - UnloadableDummyTypeSerializer.class, - previousSerializersAndConfigs.get(i).f1, - underlyingSerializer); - - if (!res.isRequiresMigration()) { - migratedSerializers.add(underlyingSerializer); - continue; - } - requiresMigration = true; - TypeDeserializer deserializer = res.getConvertDeserializer(); - if (deserializer == null) { - return requiresMigration(); - } - migratedSerializers.add(new TypeDeserializerAdapter<>(deserializer)); - } - UnionSerializer migratedSerializer = new UnionSerializer(migratedSerializers); - return requiresMigration ? requiresMigration(migratedSerializer) : CompatibilityResult.compatible(); - } - - private void readObject(java.io.ObjectInputStream in) - throws IOException, ClassNotFoundException { - in.defaultReadObject(); - this.reusableObjects = createReusableObjects(this.underlyingSerializers); - } -} diff --git a/da-streamingledger-sdk/src/main/java/com/dataartisans/streamingledger/sdk/common/union/UnionSerializerConfigSnapshot.java b/da-streamingledger-sdk/src/main/java/com/dataartisans/streamingledger/sdk/common/union/UnionSerializerConfigSnapshot.java deleted file mode 100644 index 0e84998..0000000 --- a/da-streamingledger-sdk/src/main/java/com/dataartisans/streamingledger/sdk/common/union/UnionSerializerConfigSnapshot.java +++ /dev/null @@ -1,56 +0,0 @@ -/* - * Copyright 2018 Data Artisans GmbH - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.dataartisans.streamingledger.sdk.common.union; - -import org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot; -import org.apache.flink.api.common.typeutils.TypeSerializer; - -import java.util.List; - -/** - * UnionSerializerConfigSnapshot - A serializer configuration snapshot for the {@link UnionSerializer}. - */ -public final class UnionSerializerConfigSnapshot extends CompositeTypeSerializerConfigSnapshot { - - private static final int VERSION = 1; - - /** - * A default constructor is required by Flink's serialization framework. - */ - @SuppressWarnings("unused") - public UnionSerializerConfigSnapshot() { - } - - /** - * Creates a new instance of UnionSerializerConfigSnapshot. - */ - @SuppressWarnings("WeakerAccess") - public UnionSerializerConfigSnapshot(List> underlyingSerializers) { - super(toArray(underlyingSerializers)); - } - - private static TypeSerializer[] toArray(List> underlyingSerializers) { - final int n = underlyingSerializers.size(); - TypeSerializer[] rawSerializers = new TypeSerializer[n]; - return underlyingSerializers.toArray(rawSerializers); - } - - @Override - public int getVersion() { - return VERSION; - } -} diff --git a/da-streamingledger-sdk/src/main/java/com/dataartisans/streamingledger/sdk/common/union/UnionTypeInfo.java b/da-streamingledger-sdk/src/main/java/com/dataartisans/streamingledger/sdk/common/union/UnionTypeInfo.java deleted file mode 100644 index 23a8a8b..0000000 --- a/da-streamingledger-sdk/src/main/java/com/dataartisans/streamingledger/sdk/common/union/UnionTypeInfo.java +++ /dev/null @@ -1,117 +0,0 @@ -/* - * Copyright 2018 Data Artisans GmbH - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.dataartisans.streamingledger.sdk.common.union; - -import org.apache.flink.api.common.ExecutionConfig; -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.common.typeutils.TypeSerializer; - -import java.io.Serializable; -import java.util.ArrayList; -import java.util.List; -import java.util.Objects; - -import static java.util.Objects.requireNonNull; - -final class UnionTypeInfo extends TypeInformation implements Serializable { - - private static final long serialVersionUID = 1; - - private final List> underlyingTypes; - - UnionTypeInfo(List> underlyingTypes) { - requireNonNull(underlyingTypes); - this.underlyingTypes = underlyingTypes; - } - - @Override - public boolean isBasicType() { - return false; - } - - @Override - public boolean isTupleType() { - return false; - } - - @Override - public int getArity() { - return 1; - } - - @Override - public int getTotalFields() { - return 1; - } - - @Override - public Class getTypeClass() { - return TaggedElement.class; - } - - @Override - public boolean isKeyType() { - return false; - } - - @Override - public TypeSerializer createSerializer(ExecutionConfig config) { - List> underlyingSerializers = new ArrayList<>(underlyingTypes.size()); - for (TypeInformation underlyingType : underlyingTypes) { - TypeSerializer serializer = underlyingType.createSerializer(config); - underlyingSerializers.add(serializer); - } - return new UnionSerializer(underlyingSerializers); - } - - @Override - public String toString() { - StringBuilder sb = new StringBuilder(); - sb.append("UnionTaggedSerializer {"); - final int size = underlyingTypes.size(); - for (int i = 0; i < size; i++) { - sb.append(underlyingTypes.get(i).toString()); - if (i < size - 1) { - sb.append(", "); - } - } - sb.append(" }"); - return sb.toString(); - } - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - UnionTypeInfo that = (UnionTypeInfo) o; - return Objects.equals(underlyingTypes, that.underlyingTypes); - } - - @Override - public int hashCode() { - return Objects.hash(underlyingTypes); - } - - @Override - public boolean canEqual(Object obj) { - return obj instanceof UnionTypeInfo; - } -} diff --git a/da-streamingledger-sdk/src/main/java/com/dataartisans/streamingledger/sdk/spi/InputAndSpec.java b/da-streamingledger-sdk/src/main/java/com/dataartisans/streamingledger/sdk/spi/InputAndSpec.java deleted file mode 100644 index d30894f..0000000 --- a/da-streamingledger-sdk/src/main/java/com/dataartisans/streamingledger/sdk/spi/InputAndSpec.java +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Copyright 2018 Data Artisans GmbH - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.dataartisans.streamingledger.sdk.spi; - -import org.apache.flink.streaming.api.datastream.DataStream; - -import com.dataartisans.streamingledger.sdk.api.StreamingLedger; - -import java.util.List; - -import static java.util.Objects.requireNonNull; - -/** - * An input to {@link StreamingLedgerRuntimeProvider#translate(String, List)}. - * This would be created by {@link StreamingLedger#resultStreams()}. - */ -public final class InputAndSpec { - - public final DataStream inputStream; - - public final String streamName; - - public final StreamingLedgerSpec streamSpec; - - public InputAndSpec(DataStream inputStream, String streamName, StreamingLedgerSpec spec) { - this.inputStream = requireNonNull(inputStream); - this.streamName = requireNonNull(streamName); - this.streamSpec = requireNonNull(spec); - } -} diff --git a/da-streamingledger-sdk/src/main/java/com/dataartisans/streamingledger/sdk/spi/StreamingLedgerRuntimeLoader.java b/da-streamingledger-sdk/src/main/java/com/dataartisans/streamingledger/sdk/spi/StreamingLedgerRuntimeLoader.java deleted file mode 100644 index 544629a..0000000 --- a/da-streamingledger-sdk/src/main/java/com/dataartisans/streamingledger/sdk/spi/StreamingLedgerRuntimeLoader.java +++ /dev/null @@ -1,106 +0,0 @@ -/* - * Copyright 2018 Data Artisans GmbH - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.dataartisans.streamingledger.sdk.spi; - -import org.apache.flink.util.FlinkRuntimeException; - -import java.util.Iterator; -import java.util.ServiceLoader; -import java.util.concurrent.locks.ReentrantLock; - -import javax.annotation.Nullable; -import javax.annotation.concurrent.GuardedBy; - -/** - * A loader utility to find and load the {@link StreamingLedgerRuntimeProvider} to - * execute the program with. - */ -public class StreamingLedgerRuntimeLoader { - - /** - * The lock to make sure only one transaction runtime provider is ever loaded. - */ - private static final ReentrantLock LOCK = new ReentrantLock(); - - /** - * The transaction runtime provider, null if not yet loaded. - */ - @Nullable - private static StreamingLedgerRuntimeProvider runtimeProvider; - - /** - * Gets the runtime provider to execute the transactions. This method will load - * the runtime provider if it has not been loaded before. - * - *

If more than one runtime provider is found, this method throws an - * exception, because it cannot determine which provider to use. - */ - public static StreamingLedgerRuntimeProvider getRuntimeProvider() { - LOCK.lock(); - try { - if (runtimeProvider == null) { - runtimeProvider = loadRuntimeProvider(); - } - return runtimeProvider; - } - finally { - LOCK.unlock(); - } - } - - @GuardedBy("LOCK") - private static StreamingLedgerRuntimeProvider loadRuntimeProvider() { - try { - ServiceLoader serviceLoader = - ServiceLoader.load(StreamingLedgerRuntimeProvider.class); - - Iterator iter = serviceLoader.iterator(); - - // find the first service implementation - StreamingLedgerRuntimeProvider firstProvider; - if (iter.hasNext()) { - firstProvider = iter.next(); - } - else { - throw new FlinkRuntimeException("No StreamingLedgerRuntimeProvider found. " - + "Please make sure you have a transaction runtime implementation in the classpath."); - } - - // check if there is more than one service implementation - if (iter.hasNext()) { - String secondServiceName = "(could not load service implementation)"; - try { - secondServiceName = iter.next().getClass().getName(); - } - catch (Throwable ignored) { - } - - throw new FlinkRuntimeException("Ambiguous: Found more than one StreamingLedgerRuntimeProvider: " - + firstProvider.getClass().getName() + " and " + secondServiceName); - } - - return firstProvider; - } - catch (FlinkRuntimeException e) { - // simply propagate without further wrapping, for simplicity - throw e; - } - catch (Throwable t) { - throw new FlinkRuntimeException("Could not load StreamingLedgerRuntimeProvider", t); - } - } -} diff --git a/da-streamingledger-sdk/src/main/java/com/dataartisans/streamingledger/sdk/spi/StreamingLedgerRuntimeProvider.java b/da-streamingledger-sdk/src/main/java/com/dataartisans/streamingledger/sdk/spi/StreamingLedgerRuntimeProvider.java deleted file mode 100644 index bab19f9..0000000 --- a/da-streamingledger-sdk/src/main/java/com/dataartisans/streamingledger/sdk/spi/StreamingLedgerRuntimeProvider.java +++ /dev/null @@ -1,34 +0,0 @@ -/* - * Copyright 2018 Data Artisans GmbH - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.dataartisans.streamingledger.sdk.spi; - -import com.dataartisans.streamingledger.sdk.api.StreamingLedger.ResultStreams; - -import java.util.List; - -/** - * The stream ledger runtime provider is responsible for taking a stream ledger - * program and creating the Flink streaming graph that should execute it. - * - *

The exact way of executing the distributed transactions is implementation specific. - * Some runtime providers instantiate a serial (trivially correct) execution model, some - * a sophisticated parallel data flow. - */ -public interface StreamingLedgerRuntimeProvider { - - ResultStreams translate(String name, List> streamLedgerSpecs); -} diff --git a/da-streamingledger-sdk/src/main/java/com/dataartisans/streamingledger/sdk/spi/StreamingLedgerSpec.java b/da-streamingledger-sdk/src/main/java/com/dataartisans/streamingledger/sdk/spi/StreamingLedgerSpec.java deleted file mode 100644 index 5c4ee0b..0000000 --- a/da-streamingledger-sdk/src/main/java/com/dataartisans/streamingledger/sdk/spi/StreamingLedgerSpec.java +++ /dev/null @@ -1,87 +0,0 @@ -/* - * Copyright 2018 Data Artisans GmbH - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.dataartisans.streamingledger.sdk.spi; - -import org.apache.flink.api.common.typeinfo.TypeInformation; - -import com.dataartisans.streamingledger.sdk.api.StreamingLedger; -import com.dataartisans.streamingledger.sdk.api.StreamingLedger.StateAccessSpec; -import com.dataartisans.streamingledger.sdk.api.TransactionProcessFunction; - -import java.io.Serializable; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.Objects; - -import static java.util.Objects.requireNonNull; - -/** - * A {@code StreamingLedgerSpec} contains all the necessary information gathered at - * {@link StreamingLedger#usingStream} to execute the process function at runtime. - * - * @param The type of the transaction data events. - * @param The type of the transaction data results. - */ -public final class StreamingLedgerSpec implements Serializable { - - private static final long serialVersionUID = 1; - - // ------------------------------------------------------------------------ - // Properties - // ------------------------------------------------------------------------ - - public final TransactionProcessFunction processFunction; - public final String processMethodName; - public final List> stateBindings; - public final TypeInformation inputType; - public final TypeInformation resultType; - - StreamingLedgerSpec( - TransactionProcessFunction processFunction, - String processMethodName, - List> stateBindings, - TypeInformation inputType, - TypeInformation resultType) { - this.processFunction = requireNonNull(processFunction); - this.processMethodName = requireNonNull(processMethodName); - this.stateBindings = Collections.unmodifiableList(new ArrayList<>(stateBindings)); - this.inputType = requireNonNull(inputType); - this.resultType = requireNonNull(resultType); - } - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - StreamingLedgerSpec that = (StreamingLedgerSpec) o; - return Objects.equals(processFunction, that.processFunction) - && Objects.equals(processMethodName, that.processMethodName) - && Objects.equals(stateBindings, that.stateBindings) - && Objects.equals(inputType, that.inputType) - && Objects.equals(resultType, that.resultType); - } - - @Override - public int hashCode() { - return Objects.hash(processFunction, processMethodName, stateBindings, inputType, resultType); - } -} diff --git a/da-streamingledger-sdk/src/main/java/com/dataartisans/streamingledger/sdk/spi/StreamingLedgerSpecFactory.java b/da-streamingledger-sdk/src/main/java/com/dataartisans/streamingledger/sdk/spi/StreamingLedgerSpecFactory.java deleted file mode 100644 index 493a89a..0000000 --- a/da-streamingledger-sdk/src/main/java/com/dataartisans/streamingledger/sdk/spi/StreamingLedgerSpecFactory.java +++ /dev/null @@ -1,200 +0,0 @@ -/* - * Copyright 2018 Data Artisans GmbH - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.dataartisans.streamingledger.sdk.spi; - -import org.apache.flink.annotation.Internal; -import org.apache.flink.api.common.typeinfo.TypeInformation; - -import com.dataartisans.streamingledger.sdk.api.StateAccess; -import com.dataartisans.streamingledger.sdk.api.StreamingLedger.StateAccessSpec; -import com.dataartisans.streamingledger.sdk.api.TransactionProcessFunction; -import com.dataartisans.streamingledger.sdk.api.TransactionProcessFunction.Context; -import com.dataartisans.streamingledger.sdk.api.TransactionProcessFunction.ProcessTransaction; -import com.dataartisans.streamingledger.sdk.api.TransactionProcessFunction.State; -import com.dataartisans.streamingledger.sdk.common.reflection.Methods; - -import java.lang.reflect.Method; -import java.lang.reflect.Parameter; -import java.lang.reflect.ParameterizedType; -import java.lang.reflect.Type; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.function.Function; - -import static java.util.Objects.requireNonNull; - -/** - * A {@link StreamingLedgerSpec} factory. - */ -@Internal -public final class StreamingLedgerSpecFactory { - - private StreamingLedgerSpecFactory() { - } - - /** - * Creates a new TransactionOperationSpec. - * - * @param The type of the transaction data events. - * @param The type of the transaction data results. - */ - public static StreamingLedgerSpec create( - TransactionProcessFunction processFunction, - List> stateSpecs, - TypeInformation inputType, - TypeInformation outputType) { - - requireNonNull(processFunction); - requireNonNull(stateSpecs); - Iterator annotatedMethods = Methods.findAnnotatedMethods( - processFunction.getClass(), - ProcessTransaction.class); - - if (!annotatedMethods.hasNext()) { - throw missingAnnotation(processFunction); - } - final Method method = annotatedMethods.next(); - if (annotatedMethods.hasNext()) { - throw tooManyAnnotatedMethods(processFunction); - } - final Parameter[] parameters = method.getParameters(); - if (parameters.length != stateSpecs.size() + 2) { - throw wrongParameterCount(processFunction, method.getName(), inputType.getTypeClass(), stateSpecs); - } - // first parameter is the input - if (!isOfSimpleType(parameters[0], inputType)) { - throw wrongParameter(processFunction, method.getName(), parameters[0], "wrong type."); - } - // second parameter is Context - if (!isOfGenericType(parameters[1], Context.class, outputType)) { - throw wrongParameter(processFunction, method.getName(), parameters[1], "wrong type."); - } - // the rest of the parameters are StateAccess annotated with the @State annotation. - // The annotation connects a parameter to the StateAccessSpec via bindName. - Map> bindName2StateSpec = uniqueIndex(stateSpecs, s -> s.bindName); - List> referencedStateSpecs = new ArrayList<>(); - for (int i = 2; i < parameters.length; i++) { - final Parameter parameter = parameters[i]; - final State state = parameter.getAnnotation(State.class); - if (state == null) { - throw wrongParameter( - processFunction, - method.getName(), - parameters[i], - "not annotated with a @State."); - } - StateAccessSpec stateAccess = bindName2StateSpec.get(state.value()); - if (stateAccess == null) { - throw wrongParameter( - processFunction, - method.getName(), - parameters[i], - "unknown state spec '" + state.value() + "'"); - } - // parameter: StateAccess - if (!isOfGenericType(parameter, StateAccess.class, stateAccess.state.getValueType())) { - throw wrongParameter( - processFunction, - method.getName(), - parameters[i], - "state spec '" + state.value() + "' has a value type " - + stateAccess.state.getValueType()); - } - referencedStateSpecs.add(stateAccess); - } - - return new StreamingLedgerSpec<>( - processFunction, - method.getName(), - referencedStateSpecs, - inputType, - outputType); - } - - - // ------------------------------------------------------------------------ - // Static helpers - // ------------------------------------------------------------------------ - - private static IllegalArgumentException missingAnnotation(Object subject) { - String className = subject.getClass().getSimpleName(); - String annotation = ProcessTransaction.class.getSimpleName(); - return new IllegalArgumentException("Could not find any method of " + className + " that is annotated with @" - + annotation - ); - } - - private static IllegalArgumentException tooManyAnnotatedMethods(Object subject) { - String className = subject.getClass().getSimpleName(); - String annotation = ProcessTransaction.class.getSimpleName(); - return new IllegalArgumentException("There multiple methods of " + className + " that are annotated with " - + "@" + annotation - ); - } - - private static IllegalArgumentException wrongParameter( - Object subject, - String methodName, - Parameter parameter, - String message) { - String className = subject.getClass().getSimpleName(); - return new IllegalArgumentException("A problem with the field " + parameter + " of " + className + "." - + methodName + "\t" + message - ); - } - - private static IllegalArgumentException wrongParameterCount( - Object subject, - String methodName, - Class typeClass, - List> stateSpecs) { - String className = subject.getClass().getSimpleName(); - return new IllegalArgumentException(className + "." + methodName + " has wrong argument count." - + " Expected: " + methodName + "(" + typeClass + ", Context<" + typeClass + ">, ... " - + stateSpecs.size() + " state accesses" - ); - } - - private static boolean isOfSimpleType(Parameter parameter, TypeInformation type) { - return parameter.getType() == type.getTypeClass(); - } - - @SuppressWarnings("BooleanMethodIsAlwaysInverted") - private static boolean isOfGenericType(Parameter parameter, Class baseType, TypeInformation type) { - if (!(parameter.getParameterizedType() instanceof ParameterizedType)) { - return false; - } - ParameterizedType parameterizedType = (ParameterizedType) parameter.getParameterizedType(); - if (parameterizedType.getRawType() != baseType) { - return false; - } - Type t = parameterizedType.getActualTypeArguments()[0]; - return t == type.getTypeClass(); - } - - private static Map uniqueIndex(Iterable elements, Function indexExtractor) { - Map key2element = new HashMap<>(); - for (E element : elements) { - K key = indexExtractor.apply(element); - key2element.put(key, element); - } - return key2element; - } -} diff --git a/da-streamingledger-sdk/src/test/java/com/dataartisans/streamingledger/sdk/api/StreamingLedgerSpecTest.java b/da-streamingledger-sdk/src/test/java/com/dataartisans/streamingledger/sdk/api/StreamingLedgerSpecTest.java deleted file mode 100644 index 52c55a3..0000000 --- a/da-streamingledger-sdk/src/test/java/com/dataartisans/streamingledger/sdk/api/StreamingLedgerSpecTest.java +++ /dev/null @@ -1,266 +0,0 @@ -/* - * Copyright 2018 Data Artisans GmbH - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.dataartisans.streamingledger.sdk.api; - -import org.apache.flink.api.common.typeinfo.BasicTypeInfo; -import org.apache.flink.api.common.typeinfo.TypeInformation; - -import com.dataartisans.streamingledger.sdk.api.StreamingLedger.State; -import com.dataartisans.streamingledger.sdk.api.StreamingLedger.StateAccessSpec; -import com.dataartisans.streamingledger.sdk.spi.StreamingLedgerSpec; -import com.dataartisans.streamingledger.sdk.spi.StreamingLedgerSpecFactory; -import org.junit.Test; - -import java.util.Collections; -import java.util.List; - -import static org.hamcrest.CoreMatchers.is; -import static org.hamcrest.MatcherAssert.assertThat; - -/** - * Test {@link StreamingLedgerSpec}. - */ -public class StreamingLedgerSpecTest { - - private static StreamingLedgerSpec createSpecificationUnderTest( - TransactionProcessFunction processFunction, - TypeInformation inType, - TypeInformation outType) { - - State state = new State<>( - "state", - BasicTypeInfo.INT_TYPE_INFO, - BasicTypeInfo.LONG_TYPE_INFO); - - StateAccessSpec accessSpec = new StateAccessSpec<>( - "value", - state, - Object::hashCode, - AccessType.READ_WRITE); - - List> bindings = Collections.singletonList(accessSpec); - return StreamingLedgerSpecFactory.create(processFunction, bindings, inType, outType); - } - - @Test - public void example() { - - final class TxnFn extends TransactionProcessFunction { - - private static final long serialVersionUID = 1; - - @ProcessTransaction - public void process(String input, Context out, @State("value") StateAccess value) { - Long v = value.read(); - out.emit(v); - } - } - - StreamingLedgerSpec spec = createSpecificationUnderTest( - new TxnFn(), - BasicTypeInfo.STRING_TYPE_INFO, - BasicTypeInfo.LONG_TYPE_INFO); - - assertThat(spec.processMethodName, is("process")); - } - - @Test(expected = IllegalArgumentException.class) - public void missingProcessTransactionAnnotation() { - - final class MissingProcessTxnAnnotation extends TransactionProcessFunction { - - private static final long serialVersionUID = 1; - - // Ooops: @ProcessTransaction - public void process(Boolean input, Context out, @State("value") StateAccess value) { - Long v = value.read(); - out.emit(v); - } - } - - createSpecificationUnderTest( - new MissingProcessTxnAnnotation(), - BasicTypeInfo.STRING_TYPE_INFO, - BasicTypeInfo.LONG_TYPE_INFO); - } - - @Test(expected = IllegalArgumentException.class) - public void twoFunctionsAnnotated() { - - final class TwoFunctionsAnnotated extends TransactionProcessFunction { - - private static final long serialVersionUID = 1; - - @ProcessTransaction - public void process1(Boolean input, Context out, @State("value") StateAccess value) { - Long v = value.read(); - out.emit(v); - } - - @ProcessTransaction - public void process2(Boolean input, Context out, @State("value") StateAccess value) { - Long v = value.read(); - out.emit(v); - } - } - - createSpecificationUnderTest( - new TwoFunctionsAnnotated(), - BasicTypeInfo.STRING_TYPE_INFO, - BasicTypeInfo.LONG_TYPE_INFO); - } - - @Test - public void subClassingWorks() { - - class ParentClass extends TransactionProcessFunction { - - private static final long serialVersionUID = 1; - - @ProcessTransaction - public void process(String input, Context out, @State("value") StateAccess value) { - Long v = value.read(); - out.emit(v); - } - - } - - final class SubClass extends ParentClass { - } - - createSpecificationUnderTest( - new SubClass(), - BasicTypeInfo.STRING_TYPE_INFO, - BasicTypeInfo.LONG_TYPE_INFO); - } - - @Test(expected = IllegalArgumentException.class) - public void wrongInputParameter() { - - final class WrongInputParameter extends TransactionProcessFunction { - - private static final long serialVersionUID = 1; - - @ProcessTransaction - public void process(Boolean input, Context out, @State("value") StateAccess value) { - Long v = value.read(); - out.emit(v); - } - } - - createSpecificationUnderTest( - new WrongInputParameter(), - BasicTypeInfo.STRING_TYPE_INFO, - BasicTypeInfo.LONG_TYPE_INFO); - } - - @Test(expected = IllegalArgumentException.class) - public void wrongContextTypeParameter() { - - final class WrongContextParameter extends TransactionProcessFunction { - - private static final long serialVersionUID = 1; - - @ProcessTransaction - public void process(String input, Context out, @State("value") StateAccess value) { - out.emit(true); - } - } - - createSpecificationUnderTest( - new WrongContextParameter(), - BasicTypeInfo.STRING_TYPE_INFO, - BasicTypeInfo.LONG_TYPE_INFO); - } - - @Test(expected = IllegalArgumentException.class) - public void unknownStateBindingName() { - - final class UnknownStateBindingName extends TransactionProcessFunction { - - private static final long serialVersionUID = 1; - - @ProcessTransaction - public void process(String input, Context out, @State("xyz") StateAccess value) { - out.emit(1L); - } - } - - createSpecificationUnderTest( - new UnknownStateBindingName(), - BasicTypeInfo.STRING_TYPE_INFO, - BasicTypeInfo.LONG_TYPE_INFO); - } - - @Test(expected = IllegalArgumentException.class) - public void wrongStateAccessTypeParameter() { - - final class UnknownStateBindingName extends TransactionProcessFunction { - - private static final long serialVersionUID = 1; - - @ProcessTransaction - public void process(String input, Context out, @State("value") StateAccess value) { - out.emit(1L); - } - } - - createSpecificationUnderTest( - new UnknownStateBindingName(), - BasicTypeInfo.STRING_TYPE_INFO, - BasicTypeInfo.LONG_TYPE_INFO); - } - - @Test(expected = IllegalArgumentException.class) - public void stateBindingIsNotAnnotatedWithStateAnnotation() { - - final class MissingStateAnnotation extends TransactionProcessFunction { - - private static final long serialVersionUID = 1; - - @ProcessTransaction - public void process(String input, Context out, StateAccess value) { - out.emit(1L); - } - } - - createSpecificationUnderTest( - new MissingStateAnnotation(), - BasicTypeInfo.STRING_TYPE_INFO, - BasicTypeInfo.LONG_TYPE_INFO); - } - - @Test(expected = IllegalArgumentException.class) - public void missingStateAccess() { - - final class MissingStateAccess extends TransactionProcessFunction { - - private static final long serialVersionUID = 1; - - @ProcessTransaction - public void process(String input, Context out, @State("value") Long value) { - out.emit(1L); - } - } - - createSpecificationUnderTest( - new MissingStateAccess(), - BasicTypeInfo.STRING_TYPE_INFO, - BasicTypeInfo.LONG_TYPE_INFO); - } - -} diff --git a/da-streamingledger-sdk/src/test/java/com/dataartisans/streamingledger/sdk/common/reflection/ByteBuddyProcessFunctionInvokerTest.java b/da-streamingledger-sdk/src/test/java/com/dataartisans/streamingledger/sdk/common/reflection/ByteBuddyProcessFunctionInvokerTest.java deleted file mode 100644 index c7ee7d9..0000000 --- a/da-streamingledger-sdk/src/test/java/com/dataartisans/streamingledger/sdk/common/reflection/ByteBuddyProcessFunctionInvokerTest.java +++ /dev/null @@ -1,161 +0,0 @@ - -/* - * Copyright 2018 Data Artisans GmbH - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.dataartisans.streamingledger.sdk.common.reflection; - -import org.apache.flink.api.common.typeinfo.BasicTypeInfo; -import org.apache.flink.api.common.typeinfo.TypeInformation; - -import com.dataartisans.streamingledger.sdk.api.AccessType; -import com.dataartisans.streamingledger.sdk.api.StateAccess; -import com.dataartisans.streamingledger.sdk.api.StateAccessException; -import com.dataartisans.streamingledger.sdk.api.StreamingLedger.State; -import com.dataartisans.streamingledger.sdk.api.StreamingLedger.StateAccessSpec; -import com.dataartisans.streamingledger.sdk.api.TransactionProcessFunction; -import com.dataartisans.streamingledger.sdk.api.TransactionProcessFunction.Context; -import com.dataartisans.streamingledger.sdk.spi.StreamingLedgerSpec; -import com.dataartisans.streamingledger.sdk.spi.StreamingLedgerSpecFactory; -import org.junit.Before; -import org.junit.Test; - -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; - -import static org.hamcrest.CoreMatchers.hasItem; -import static org.hamcrest.MatcherAssert.assertThat; - -/** - * Test {@link ByteBuddyProcessFunctionInvoker}. - */ -public class ByteBuddyProcessFunctionInvokerTest { - - // -------------------------------------------------------------------------------------------------------------- - // Transaction Function. - // -------------------------------------------------------------------------------------------------------------- - - private ListContext context; - - // -------------------------------------------------------------------------------------------------------------- - // Test Setup. - // -------------------------------------------------------------------------------------------------------------- - private StateAccessStub[] arguments; - private StreamingLedgerSpec specification; - - private static StreamingLedgerSpec createSpecificationUnderTest() { - State state = new State<>( - "state", - BasicTypeInfo.INT_TYPE_INFO, - BasicTypeInfo.LONG_TYPE_INFO); - - StateAccessSpec accessSpec = new StateAccessSpec<>( - "value", - state, - String::hashCode, - AccessType.READ_WRITE); - - List> bindings = Collections.singletonList(accessSpec); - TypeInformation inType = BasicTypeInfo.STRING_TYPE_INFO; - TypeInformation outType = BasicTypeInfo.LONG_TYPE_INFO; - return StreamingLedgerSpecFactory.create(new ReadingTransactionFunction(), bindings, inType, outType); - } - - @Before - public void before() { - specification = createSpecificationUnderTest(); - context = new ListContext<>(); - arguments = new StateAccessStub[]{new StateAccessStub()}; - } - - // -------------------------------------------------------------------------------------------------------------- - // Tests. - // -------------------------------------------------------------------------------------------------------------- - - @Test - public void usageExample() { - ProcessFunctionInvoker generatedCodeThatInvokesUserCode = - ByteBuddyProcessFunctionInvoker.create(specification); - - arguments[0].value = Long.MAX_VALUE; - generatedCodeThatInvokesUserCode.invoke("does not matter", context, arguments); - - assertThat(context.emitted, hasItem(Long.MAX_VALUE)); - } - - // -------------------------------------------------------------------------------------------------------------- - // Test Utils. - // -------------------------------------------------------------------------------------------------------------- - - /** - * ReadingTransactionFunction - simulates a user provided transaction function, that has a single state access. - * This user code will read the provided value and emit it. See: {@link #process(String, Context, StateAccess)}. - */ - private static class ReadingTransactionFunction extends TransactionProcessFunction { - - private static final long serialVersionUID = 1; - - @ProcessTransaction - public void process(String input, Context out, @State("value") StateAccess value) { - Long v = value.read(); - out.emit(v); - } - } - - private final class StateAccessStub implements StateAccess { - T value; - - @Override - public T read() throws StateAccessException { - return value; - } - - @Override - public void write(T newValue) throws StateAccessException { - value = newValue; - } - - @Override - public void delete() throws StateAccessException { - value = null; - } - - @Override - public String getStateName() { - throw new UnsupportedOperationException(); - } - - @Override - public String getStateAccessName() { - throw new UnsupportedOperationException(); - } - } - - private final class ListContext implements Context { - List emitted = new ArrayList<>(); - - @Override - public void emit(T record) { - emitted.add(record); - } - - @Override - public void abort() { - emitted.clear(); - } - } - -} diff --git a/da-streamingledger-sdk/src/test/java/com/dataartisans/streamingledger/sdk/common/union/UnionSerializerTest.java b/da-streamingledger-sdk/src/test/java/com/dataartisans/streamingledger/sdk/common/union/UnionSerializerTest.java deleted file mode 100644 index 6d4fa85..0000000 --- a/da-streamingledger-sdk/src/test/java/com/dataartisans/streamingledger/sdk/common/union/UnionSerializerTest.java +++ /dev/null @@ -1,86 +0,0 @@ -/* - * Copyright 2018 Data Artisans GmbH - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.dataartisans.streamingledger.sdk.common.union; - -import org.apache.flink.api.common.ExecutionConfig; -import org.apache.flink.api.common.typeinfo.BasicTypeInfo; -import org.apache.flink.api.common.typeutils.SerializerTestBase; -import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.util.XORShiftRandom; - -import java.util.ArrayList; -import java.util.List; - -/** - * Test {@link UnionSerializer}. - */ -public class UnionSerializerTest extends SerializerTestBase { - - @Override - protected TypeSerializer createSerializer() { - List> serializers = new ArrayList<>(); - ExecutionConfig config = new ExecutionConfig(); - - serializers.add(BasicTypeInfo.LONG_TYPE_INFO.createSerializer(config)); - serializers.add(BasicTypeInfo.STRING_TYPE_INFO.createSerializer(config)); - serializers.add(BasicTypeInfo.BOOLEAN_TYPE_INFO.createSerializer(config)); - - return new UnionSerializer(serializers); - } - - @Override - protected int getLength() { - return -1; - } - - @Override - protected Class getTypeClass() { - return TaggedElement.class; - } - - @Override - protected TaggedElement[] getTestData() { - XORShiftRandom random = new XORShiftRandom(); - - TaggedElement[] data = new TaggedElement[100]; - for (int i = 0; i < data.length; i++) { - final int tag = random.nextInt(3); - final Object element; - switch (tag) { - case 0: { - element = random.nextLong(); - break; - } - case 1: { - byte[] bytes = new byte[random.nextInt(256)]; - random.nextBytes(bytes); - element = new String(bytes); - break; - } - case 2: { - element = random.nextBoolean(); - break; - } - default: { - element = null; - } - } - data[i] = new TaggedElement(tag, element); - } - return data; - } -} diff --git a/logo.png b/logo.png deleted file mode 100644 index eb669c9..0000000 Binary files a/logo.png and /dev/null differ diff --git a/pom.xml b/pom.xml deleted file mode 100644 index 41f52e1..0000000 --- a/pom.xml +++ /dev/null @@ -1,424 +0,0 @@ - - - - - 4.0.0 - - com.data-artisans.streamingledger - da-streamingledger - 1.1-SNAPSHOT - - da-streamingledger - pom - - Serializable ACID transactions on streaming data - http://data-artisans.com - - https://github.com/dataArtisans/da-streamingledger - - - - - jdoe - info@data-artisans.com - http://data-artisans.com/ - data Artisans - - - - - - - - - The Apache Software License, Version 2.0 - http://www.apache.org/licenses/LICENSE-2.0.txt - repo - - - - - - - UTF-8 - UTF-8 - 1.8 - 1.6.0 - 1.8.0 - 3.0.1 - - - - - - da-streamingledger-sdk - da-streamingledger-runtime-serial - da-streamingledger-examples - - - - - - - - - org.slf4j - slf4j-api - - - - com.google.code.findbugs - jsr305 - - - - - - junit - junit - 4.12 - jar - test - - - - - - org.slf4j - slf4j-log4j12 - jar - test - - - - log4j - log4j - jar - test - - - - - - - com.google.code.findbugs - jsr305 - 1.3.9 - - - org.slf4j - slf4j-api - 1.7.7 - - - org.slf4j - slf4j-log4j12 - 1.7.7 - - - log4j - log4j - 1.2.17 - - - - - - - - - - - - org.apache.maven.plugins - maven-compiler-plugin - 3.1 - - ${java.version} - ${java.version} - -Xlint:all - - - - - - org.apache.maven.plugins - maven-jar-plugin - 2.4 - - - - true - true - - - - - - - - org.apache.maven.plugins - maven-source-plugin - 2.2.1 - - - attach-sources - - jar - - - - - - org.apache.maven.plugins - maven-javadoc-plugin - 2.9.1 - - true - -Xdoclint:none - false - - - - attach-javadocs - - jar - - - - - - - - org.apache.maven.plugins - maven-surefire-plugin - - 2.18.1 - - 2C - - 0${surefire.forkNumber} - log4j-test.properties - - -Xms256m -Xmx2048m -Dmvn.forkNumber=${surefire.forkNumber} -XX:+UseG1GC - - - - unit-tests - test - - test - - - - **/*Test.* - - true - - - - integration-tests - integration-test - - test - - - - **/*ITCase.* - - false - - - - - - - - org.apache.rat - apache-rat-plugin - 0.12 - false - - - verify - - check - - - - - false - false - 0 - - - Data Artisans GmbH - - - - - Apache License Version 2.0 - - - - README.md - **/.*/** - **/target/** - tools/maven/** - tools/intellij/** - **/.idea/** - **/*.iml - **/dependency-reduced-pom.xml - - - - - - - org.apache.maven.plugins - maven-checkstyle-plugin - 2.17 - - - com.puppycrawl.tools - checkstyle - 8.4 - - - - - validate - validate - - check - - - - - tools/maven/checkstyle.xml - tools/maven/checkstyle-suppressions.xml - true - true - true - - - - - - com.github.spotbugs - spotbugs-maven-plugin - 3.1.1 - - - com.github.spotbugs - spotbugs - 3.1.1 - - - - - analyze-compile - compile - - check - - - - - Max - Low - tools/maven/spotbugs-exclude.xml - - - - - - org.apache.maven.plugins - maven-enforcer-plugin - 3.0.0-M1 - - - enforce-min-versions - - enforce - - - - - [3.0.3,) - - - ${java.version} - - - - - - dependency-convergence - - enforce - - - - - - - - - - - - - - - - - ossrh - https://oss.sonatype.org/content/repositories/snapshots - - - ossrh - https://oss.sonatype.org/service/local/staging/deploy/maven2/ - - - - - - release - - - - org.apache.maven.plugins - maven-gpg-plugin - 1.5 - - - sign-artifacts - verify - - sign - - - - - - - - - - diff --git a/tools/intellij/codestyle-java.xml b/tools/intellij/codestyle-java.xml deleted file mode 100644 index 1a748b6..0000000 --- a/tools/intellij/codestyle-java.xml +++ /dev/null @@ -1,49 +0,0 @@ - - - - - - - \ No newline at end of file diff --git a/tools/intellij/inspections.xml b/tools/intellij/inspections.xml deleted file mode 100644 index b96ae52..0000000 --- a/tools/intellij/inspections.xml +++ /dev/null @@ -1,67 +0,0 @@ - - - - - \ No newline at end of file diff --git a/tools/maven/checkstyle-suppressions.xml b/tools/maven/checkstyle-suppressions.xml deleted file mode 100644 index 194cd5f..0000000 --- a/tools/maven/checkstyle-suppressions.xml +++ /dev/null @@ -1,26 +0,0 @@ - - - - - - - diff --git a/tools/maven/checkstyle.xml b/tools/maven/checkstyle.xml deleted file mode 100644 index a1d70fe..0000000 --- a/tools/maven/checkstyle.xml +++ /dev/nulldiff --git a/tools/maven/spotbugs-exclude.xml b/tools/maven/spotbugs-exclude.xml deleted file mode 100644 index ed6b841..0000000 --- a/tools/maven/spotbugs-exclude.xml +++ /dev/null @@ -1,49 +0,0 @@ - - - - - - - - - - - - - - - - - - - - - - - - - - - - -