diff --git a/clients-schema-registry/.gitignore b/clients-schema-registry/.gitignore new file mode 100644 index 0000000..eb5a316 --- /dev/null +++ b/clients-schema-registry/.gitignore @@ -0,0 +1 @@ +target diff --git a/clients-schema-registry/java/pom.xml b/clients-schema-registry/java/pom.xml new file mode 100644 index 0000000..3e07b90 --- /dev/null +++ b/clients-schema-registry/java/pom.xml @@ -0,0 +1,52 @@ + + + 4.0.0 + + org.apache.pulsar + schema-registry-demo + 1.0-SNAPSHOT + + + 17 + 17 + UTF-8 + + + + + io.streamnative + pulsar-client + 4.1.0-sr-SNAPSHOT + + + io.streamnative + schema-registry + 4.1.0-sr-SNAPSHOT + + + org.projectlombok + lombok + 1.18.32 + + + com.fasterxml.jackson.datatype + jackson-datatype-guava + 2.17.2 + + + + + + snapshot + https://s01.oss.sonatype.org/content/repositories/snapshots/ + + false + + + true + + + + diff --git a/clients-schema-registry/java/src/main/java/org/apache/pulsar/sn/SchemaRegistryJsonDemo.java b/clients-schema-registry/java/src/main/java/org/apache/pulsar/sn/SchemaRegistryJsonDemo.java new file mode 100644 index 0000000..54b119a --- /dev/null +++ b/clients-schema-registry/java/src/main/java/org/apache/pulsar/sn/SchemaRegistryJsonDemo.java @@ -0,0 +1,54 @@ +package org.apache.pulsar.sn; + +import org.apache.commons.collections4.map.HashedMap; +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.api.SubscriptionInitialPosition; +import org.apache.pulsar.sr.kafka.KafkaSchemaInfoProviderFactory; +import org.apache.pulsar.sr.kafka.KafkaSchemas; + +import java.util.Map; + +public class SchemaRegistryJsonDemo { + + public static void main(String[] args) throws Exception { + // set Kafka schema registry related configurations + Map srConfig = new HashedMap<>(); + srConfig.put("schema.registry.url", "http://localhost:8001"); + + PulsarClient client = PulsarClient.builder() + .serviceUrl("pulsar://localhost:6650") + .schemaInfoProviderFactory(new KafkaSchemaInfoProviderFactory(srConfig)) + .build(); + + String topic = "json-test"; + + Schema schema = KafkaSchemas.JSON(User.class); + + Producer producer = client.newProducer(schema) + .topic(topic) + .create(); + + Consumer consumer = client.newConsumer(schema) + .topic(topic) + .subscriptionName("sub") + .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) + .subscribe(); + + for (int i = 0; i < 10; i++) { + producer.send(new User("name-" + i, 10 + i)); + } + + for (int i = 0; i < 10; i++) { + Message message = consumer.receive(); + consumer.acknowledge(message); + System.out.println("receive msg " + message.getValue()); + } + + client.close(); + } + +} diff --git a/clients-schema-registry/java/src/main/java/org/apache/pulsar/sn/SchemaRegistryKVDemo.java b/clients-schema-registry/java/src/main/java/org/apache/pulsar/sn/SchemaRegistryKVDemo.java new file mode 100644 index 0000000..5e12d17 --- /dev/null +++ b/clients-schema-registry/java/src/main/java/org/apache/pulsar/sn/SchemaRegistryKVDemo.java @@ -0,0 +1,56 @@ +package org.apache.pulsar.sn; + +import org.apache.commons.collections4.map.HashedMap; +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.api.SubscriptionInitialPosition; +import org.apache.pulsar.common.schema.KeyValue; +import org.apache.pulsar.sr.kafka.KafkaSchemaInfoProviderFactory; +import org.apache.pulsar.sr.kafka.KafkaSchemas; + +import java.util.Map; + +public class SchemaRegistryKVDemo { + + public static void main(String[] args) throws Exception { + // set Kafka schema registry related configurations + Map srConfig = new HashedMap<>(); + srConfig.put("schema.registry.url", "http://localhost:8001"); + + PulsarClient client = PulsarClient.builder() + .serviceUrl("pulsar://localhost:6650") + .schemaInfoProviderFactory(new KafkaSchemaInfoProviderFactory(srConfig)) + .build(); + + String topic = "kv-test"; + + Schema> schema = KafkaSchemas.KV( + Schema.STRING, KafkaSchemas.JSON(User.class)); + + Producer> producer = client.newProducer(schema) + .topic(topic) + .create(); + + Consumer> consumer = client.newConsumer(schema) + .topic(topic) + .subscriptionName("sub") + .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) + .subscribe(); + + for (int i = 0; i < 10; i++) { + producer.send(new KeyValue<>("key-" + i, new User("name-" + i, 10 + i))); + } + + for (int i = 0; i < 10; i++) { + Message> message = consumer.receive(); + consumer.acknowledge(message); + System.out.println("receive msg " + message.getValue().getKey() + " " + message.getValue().getValue()); + } + + client.close(); + } + +} diff --git a/clients-schema-registry/java/src/main/java/org/apache/pulsar/sn/User.java b/clients-schema-registry/java/src/main/java/org/apache/pulsar/sn/User.java new file mode 100644 index 0000000..04af190 --- /dev/null +++ b/clients-schema-registry/java/src/main/java/org/apache/pulsar/sn/User.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.sn; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + +@Data +@NoArgsConstructor +@AllArgsConstructor +public class User { + + private String name; + private Integer age; + +} diff --git a/clients-schema-registry/java/src/main/java/org/apache/pulsar/sn/UserKey.java b/clients-schema-registry/java/src/main/java/org/apache/pulsar/sn/UserKey.java new file mode 100644 index 0000000..cd9a54e --- /dev/null +++ b/clients-schema-registry/java/src/main/java/org/apache/pulsar/sn/UserKey.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.sn; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + +@Data +@NoArgsConstructor +@AllArgsConstructor +public class UserKey { + + private String name; + private Integer id; + +}