diff --git a/.github/workflows/build.yaml b/.github/workflows/build.yaml new file mode 100644 index 0000000..e61c642 --- /dev/null +++ b/.github/workflows/build.yaml @@ -0,0 +1,17 @@ +name: Java Maven build + +on: [push] + +jobs: + publish: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v2 + - uses: actions/setup-java@v2 + with: + java-version: '11' + distribution: 'adopt' + - name: Publish to GitHub Packages + run: mvn deploy + env: + GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} \ No newline at end of file diff --git a/README.md b/README.md index 96b446f..38fd1e3 100644 --- a/README.md +++ b/README.md @@ -26,7 +26,7 @@ This command will install all the components in your `.m2` directory. To use you ``` - net.galgus + tgamforks flink-connector-http 1.0-SNAPSHOT diff --git a/pom.xml b/pom.xml index 560a619..d5e5a6a 100644 --- a/pom.xml +++ b/pom.xml @@ -4,9 +4,9 @@ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> 4.0.0 - net.galgus + tgamforks flink-connector-http - 1.0-SNAPSHOT + 1.0 flink-connector-http @@ -37,4 +37,13 @@ + + + + github + GitHub globeandmail Apache Maven Packages + https://maven.pkg.github.com/globeandmail/flink-connector-http + + + \ No newline at end of file diff --git a/src/main/java/net/galgus/flink/streaming/connectors/http/HTTPSink.java b/src/main/java/net/galgus/flink/streaming/connectors/http/HTTPSink.java index 9f5ee60..27595bf 100644 --- a/src/main/java/net/galgus/flink/streaming/connectors/http/HTTPSink.java +++ b/src/main/java/net/galgus/flink/streaming/connectors/http/HTTPSink.java @@ -12,6 +12,7 @@ import java.net.HttpURLConnection; import java.net.URL; import java.nio.charset.StandardCharsets; +import java.util.concurrent.TimeUnit; public class HTTPSink extends RichSinkFunction { private static final Logger log = LoggerFactory.getLogger(HTTPSink.class); @@ -25,9 +26,10 @@ public HTTPSink(HTTPConnectionConfig httpConnectionConfig) { public void invoke(IN value, Context context) throws Exception { if (value != null) { URL url = new URL(httpConnectionConfig.getEndpoint()); - + + long start = System.nanoTime(); + HttpURLConnection conn = httpConnectionConfig.isHttpsEnabled() ? (HttpsURLConnection) url.openConnection() : (HttpURLConnection) url.openConnection(); - conn.setDoOutput(true); conn.setRequestMethod(httpConnectionConfig.getMethod()); @@ -58,6 +60,10 @@ public void invoke(IN value, Context context) throws Exception { + ", headers: " + httpConnectionConfig.getHeaders()); conn.disconnect(); + long elapsedNano = System.nanoTime() - start; + long elapsedTime = TimeUnit.NANOSECONDS.toMillis(elapsedNano); + log.info("Request from url = {}, with status = {} and message = {} in duration = {}ms", + conn.getURL(), status, conn.getResponseMessage(), Long.toString(elapsedTime)); } } }