diff --git a/.github/workflows/build.yaml b/.github/workflows/build.yaml
new file mode 100644
index 0000000..e61c642
--- /dev/null
+++ b/.github/workflows/build.yaml
@@ -0,0 +1,17 @@
+name: Java Maven build
+
+on: [push]
+
+jobs:
+ publish:
+ runs-on: ubuntu-latest
+ steps:
+ - uses: actions/checkout@v2
+ - uses: actions/setup-java@v2
+ with:
+ java-version: '11'
+ distribution: 'adopt'
+ - name: Publish to GitHub Packages
+ run: mvn deploy
+ env:
+ GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
\ No newline at end of file
diff --git a/README.md b/README.md
index 96b446f..38fd1e3 100644
--- a/README.md
+++ b/README.md
@@ -26,7 +26,7 @@ This command will install all the components in your `.m2` directory. To use you
```
- net.galgus
+ tgamforks
flink-connector-http
1.0-SNAPSHOT
diff --git a/pom.xml b/pom.xml
index 560a619..d5e5a6a 100644
--- a/pom.xml
+++ b/pom.xml
@@ -4,9 +4,9 @@
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
4.0.0
- net.galgus
+ tgamforks
flink-connector-http
- 1.0-SNAPSHOT
+ 1.0
flink-connector-http
@@ -37,4 +37,13 @@
+
+
+
+ github
+ GitHub globeandmail Apache Maven Packages
+ https://maven.pkg.github.com/globeandmail/flink-connector-http
+
+
+
\ No newline at end of file
diff --git a/src/main/java/net/galgus/flink/streaming/connectors/http/HTTPSink.java b/src/main/java/net/galgus/flink/streaming/connectors/http/HTTPSink.java
index 9f5ee60..27595bf 100644
--- a/src/main/java/net/galgus/flink/streaming/connectors/http/HTTPSink.java
+++ b/src/main/java/net/galgus/flink/streaming/connectors/http/HTTPSink.java
@@ -12,6 +12,7 @@
import java.net.HttpURLConnection;
import java.net.URL;
import java.nio.charset.StandardCharsets;
+import java.util.concurrent.TimeUnit;
public class HTTPSink extends RichSinkFunction {
private static final Logger log = LoggerFactory.getLogger(HTTPSink.class);
@@ -25,9 +26,10 @@ public HTTPSink(HTTPConnectionConfig httpConnectionConfig) {
public void invoke(IN value, Context context) throws Exception {
if (value != null) {
URL url = new URL(httpConnectionConfig.getEndpoint());
-
+
+ long start = System.nanoTime();
+
HttpURLConnection conn = httpConnectionConfig.isHttpsEnabled() ? (HttpsURLConnection) url.openConnection() : (HttpURLConnection) url.openConnection();
-
conn.setDoOutput(true);
conn.setRequestMethod(httpConnectionConfig.getMethod());
@@ -58,6 +60,10 @@ public void invoke(IN value, Context context) throws Exception {
+ ", headers: " + httpConnectionConfig.getHeaders());
conn.disconnect();
+ long elapsedNano = System.nanoTime() - start;
+ long elapsedTime = TimeUnit.NANOSECONDS.toMillis(elapsedNano);
+ log.info("Request from url = {}, with status = {} and message = {} in duration = {}ms",
+ conn.getURL(), status, conn.getResponseMessage(), Long.toString(elapsedTime));
}
}
}