diff --git a/README.md b/README.md index d32316d0..b338bf1c 100644 --- a/README.md +++ b/README.md @@ -14,7 +14,7 @@ repositories { } ext { - rlibVersion = "10.0.alpha" + rlibVersion = "10.0.alpha3" } dependencies { diff --git a/build.gradle b/build.gradle index 37693f60..33804a0e 100644 --- a/build.gradle +++ b/build.gradle @@ -1,4 +1,4 @@ -rootProject.version = "10.0.alpha2" +rootProject.version = "10.0.alpha3" group = 'javasabr.rlib' allprojects { diff --git a/rlib-network/src/loadTest/java/javasabr/rlib/network/StringNetworkLoadTest.java b/rlib-network/src/loadTest/java/javasabr/rlib/network/StringNetworkLoadTest.java index 5b9922f9..44a14e15 100644 --- a/rlib-network/src/loadTest/java/javasabr/rlib/network/StringNetworkLoadTest.java +++ b/rlib-network/src/loadTest/java/javasabr/rlib/network/StringNetworkLoadTest.java @@ -19,6 +19,7 @@ import javasabr.rlib.network.client.ClientNetwork; import javasabr.rlib.network.impl.DefaultBufferAllocator; import javasabr.rlib.network.impl.StringDataConnection; +import javasabr.rlib.network.packet.impl.StringReadableNetworkPacket; import javasabr.rlib.network.packet.impl.StringWritableNetworkPacket; import javasabr.rlib.network.server.ServerNetwork; import lombok.CustomLog; @@ -77,7 +78,7 @@ void connectAndSendMessages( int delay = random.nextInt(MAX_SEND_DELAY); ScheduledFuture schedule = executor.schedule( () -> { - StringWritableNetworkPacket message = newMessage(10, 10240); + var message = newMessage(10, 10240); connection.send(message); }, delay, TimeUnit.MILLISECONDS); tasks.add(schedule); @@ -120,7 +121,7 @@ void testServerWithMultiplyClients() { var serverConfig = SimpleServerNetworkConfig .builder() - .threadGroupSize(10) + .threadGroupMaxSize(10) .writeBufferSize(1024) .readBufferSize(1024) .pendingBufferSize(2048) @@ -141,10 +142,11 @@ void testServerWithMultiplyClients() { serverNetwork.onAccept(accepted -> accepted .onReceive((connection, packet) -> { + StringReadableNetworkPacket receivedPacket = (StringReadableNetworkPacket) packet; statistics .receivedClientPackersPerSecond() .accumulate(1); - connection.send(new StringWritableNetworkPacket("Echo: " + packet.data())); + connection.send(new StringWritableNetworkPacket<>("Echo: " + receivedPacket.data())); statistics .sentEchoPackersPerSecond() .accumulate(1); @@ -200,7 +202,9 @@ private static void initReceivedMessagesTracker( }, 1, 1, TimeUnit.SECONDS); } - private static StringWritableNetworkPacket newMessage(int minMessageLength, int maxMessageLength) { - return new StringWritableNetworkPacket(StringUtils.generate(minMessageLength, maxMessageLength)); + private static StringWritableNetworkPacket newMessage( + int minMessageLength, + int maxMessageLength) { + return new StringWritableNetworkPacket<>(StringUtils.generate(minMessageLength, maxMessageLength)); } } diff --git a/rlib-network/src/loadTest/java/javasabr/rlib/network/StringSslNetworkLoadTest.java b/rlib-network/src/loadTest/java/javasabr/rlib/network/StringSslNetworkLoadTest.java index 1338f13d..91a8120d 100644 --- a/rlib-network/src/loadTest/java/javasabr/rlib/network/StringSslNetworkLoadTest.java +++ b/rlib-network/src/loadTest/java/javasabr/rlib/network/StringSslNetworkLoadTest.java @@ -19,10 +19,8 @@ import javasabr.rlib.network.ServerNetworkConfig.SimpleServerNetworkConfig; import javasabr.rlib.network.client.ClientNetwork; import javasabr.rlib.network.impl.DefaultBufferAllocator; -import javasabr.rlib.network.impl.StringDataConnection; import javasabr.rlib.network.impl.StringDataSslConnection; -import javasabr.rlib.network.packet.impl.AbstractSslNetworkPacketReader; -import javasabr.rlib.network.packet.impl.AbstractSslNetworkPacketWriter; +import javasabr.rlib.network.packet.impl.StringReadableNetworkPacket; import javasabr.rlib.network.packet.impl.StringWritableNetworkPacket; import javasabr.rlib.network.server.ServerNetwork; import javasabr.rlib.network.util.NetworkUtils; @@ -83,7 +81,7 @@ void connectAndSendMessages( int delay = random.nextInt(MAX_SEND_DELAY); ScheduledFuture schedule = executor.schedule( () -> { - StringWritableNetworkPacket message = newMessage(10, 10240); // 10240 + var message = newMessage(10, 10240); // 10240 connection.send(message); }, delay, TimeUnit.MILLISECONDS); tasks.add(schedule); @@ -128,7 +126,7 @@ void testServerWithMultiplyClients() { var serverConfig = SimpleServerNetworkConfig .builder() - .threadGroupSize(10) + .threadGroupMaxSize(10) .writeBufferSize(1024) .readBufferSize(1024) .pendingBufferSize(2048) @@ -153,10 +151,11 @@ void testServerWithMultiplyClients() { serverNetwork.onAccept(accepted -> accepted .onReceive((connection, packet) -> { + StringReadableNetworkPacket receivedPacket = (StringReadableNetworkPacket) packet; statistics .receivedClientPackersPerSecond() .accumulate(1); - connection.send(new StringWritableNetworkPacket("Echo: " + packet.data())); + connection.send(new StringWritableNetworkPacket<>("Echo: " + receivedPacket.data())); statistics .sentEchoPackersPerSecond() .accumulate(1); @@ -215,7 +214,9 @@ private static void initReceivedMessagesTracker( }, 1, 1, TimeUnit.SECONDS); } - private static StringWritableNetworkPacket newMessage(int minMessageLength, int maxMessageLength) { - return new StringWritableNetworkPacket(StringUtils.generate(minMessageLength, maxMessageLength)); + private static StringWritableNetworkPacket newMessage( + int minMessageLength, + int maxMessageLength) { + return new StringWritableNetworkPacket<>(StringUtils.generate(minMessageLength, maxMessageLength)); } } diff --git a/rlib-network/src/main/java/javasabr/rlib/network/BufferAllocator.java b/rlib-network/src/main/java/javasabr/rlib/network/BufferAllocator.java index 0bcbfc99..d713c3f1 100644 --- a/rlib-network/src/main/java/javasabr/rlib/network/BufferAllocator.java +++ b/rlib-network/src/main/java/javasabr/rlib/network/BufferAllocator.java @@ -1,7 +1,6 @@ package javasabr.rlib.network; import java.nio.ByteBuffer; -import org.jspecify.annotations.Nullable; /** * The interface to implement a buffer allocator for network things. diff --git a/rlib-network/src/main/java/javasabr/rlib/network/Connection.java b/rlib-network/src/main/java/javasabr/rlib/network/Connection.java index af7eb4bc..52b3040b 100644 --- a/rlib-network/src/main/java/javasabr/rlib/network/Connection.java +++ b/rlib-network/src/main/java/javasabr/rlib/network/Connection.java @@ -11,11 +11,9 @@ * * @author JavaSaBr */ -public interface Connection { - - record ReceivedPacketEvent, R extends ReadableNetworkPacket>( - C connection, R packet) { +public interface Connection> { + record ReceivedPacketEvent(C connection, R packet) { @Override public String toString() { return "[" + connection + "|" + packet + ']'; @@ -45,27 +43,45 @@ public String toString() { /** * Send a packet to connection's owner. */ - void send(W packet); + void send(WritableNetworkPacket packet); /** * Send a packet to connection's owner with async feedback of this sending. * * @return the async result with true if the packet was sent or false if sending was failed. */ - CompletableFuture sendWithFeedback(W packet); + CompletableFuture sendWithFeedback(WritableNetworkPacket packet); /** * Register a consumer to handle received packets. */ - void onReceive(BiConsumer, ? super R> consumer); + void onReceive(BiConsumer> consumer); /** * Get a stream of received packet events. */ - Flux, ? extends R>> receivedEvents(); + Flux>> receivedEvents(); + + /** + * Get a stream of received packet events with expected packet type. + */ + default > Flux> receivedEvents(Class packetType) { + return receivedEvents() + .filter(event -> packetType.isInstance(event.packet())) + .map(event -> (ReceivedPacketEvent) event); + } /** * Get a stream of received packets. */ - Flux receivedPackets(); + Flux> receivedPackets(); + + /** + * Get a stream of received packets with expected type. + */ + default > Flux receivedPackets(Class packetType) { + return receivedPackets() + .filter(packetType::isInstance) + .map(networkPacket -> (R) networkPacket); + } } diff --git a/rlib-network/src/main/java/javasabr/rlib/network/Network.java b/rlib-network/src/main/java/javasabr/rlib/network/Network.java index 1778af27..14030f12 100644 --- a/rlib-network/src/main/java/javasabr/rlib/network/Network.java +++ b/rlib-network/src/main/java/javasabr/rlib/network/Network.java @@ -1,11 +1,17 @@ package javasabr.rlib.network; +import java.util.concurrent.ScheduledExecutorService; + /** * The interface to implement an asynchronous network. * * @author JavaSaBr */ -public interface Network> { +public interface Network> { + + ScheduledExecutorService scheduledExecutor(); + + NetworkConfig config(); /** * Shutdown this network. diff --git a/rlib-network/src/main/java/javasabr/rlib/network/NetworkConfig.java b/rlib-network/src/main/java/javasabr/rlib/network/NetworkConfig.java index 7dbbd664..ace95432 100644 --- a/rlib-network/src/main/java/javasabr/rlib/network/NetworkConfig.java +++ b/rlib-network/src/main/java/javasabr/rlib/network/NetworkConfig.java @@ -28,6 +28,8 @@ class SimpleNetworkConfig implements NetworkConfig { private int pendingBufferSize = 4096; @Builder.Default private int writeBufferSize = 2048; + @Builder.Default + private int retryDelayInMs = 1000; } NetworkConfig DEFAULT_CLIENT = new NetworkConfig() { @@ -46,34 +48,53 @@ default String threadGroupName() { } /** - * Get size of buffer with used to collect received data from network. + * Get a group name of scheduling network threads. + */ + default String scheduledThreadGroupName() { + return "ScheduledNetworkThread"; + } + + /** + * Get size of buffer with will be used to collect received data from network. */ default int readBufferSize() { return 2048; } /** - * Get size of buffer with pending data. Pending buffer allows to construct a packet with bigger data than + * Gets size of buffer with pending reading data. Pending buffer allows to construct a packet with bigger data part than * {@link #readBufferSize()}. It should be at least 2x of {@link #readBufferSize()} - * - * @return the pending buffer's size. */ default int pendingBufferSize() { return readBufferSize() * 2; } /** - * Get size of buffer which used to serialize packets to bytes. + * Gets a size of buffer which will be used for packet serialization. */ default int writeBufferSize() { return 2048; } + /** + * Gets a timeout for retry read/write operation. + */ + default int retryDelayInMs() { + return 1000; + } + + /** + * Gets a max allowed empty reads from socket channel before closing a connection. + */ + default int maxEmptyReadsBeforeClose() { + return 3; + } + default ByteOrder byteOrder() { return ByteOrder.BIG_ENDIAN; } - default boolean isDirectByteBuffer() { + default boolean useDirectByteBuffer() { return false; } } diff --git a/rlib-network/src/main/java/javasabr/rlib/network/NetworkFactory.java b/rlib-network/src/main/java/javasabr/rlib/network/NetworkFactory.java index 5d27f4b8..187ec72b 100644 --- a/rlib-network/src/main/java/javasabr/rlib/network/NetworkFactory.java +++ b/rlib-network/src/main/java/javasabr/rlib/network/NetworkFactory.java @@ -23,13 +23,13 @@ @UtilityClass public final class NetworkFactory { - public static > ClientNetwork clientNetwork( + public static > ClientNetwork clientNetwork( NetworkConfig networkConfig, BiFunction, AsynchronousSocketChannel, C> channelToConnection) { return new DefaultClientNetwork<>(networkConfig, channelToConnection); } - public static > ServerNetwork serverNetwork( + public static > ServerNetwork serverNetwork( ServerNetworkConfig networkConfig, BiFunction, AsynchronousSocketChannel, C> channelToConnection) { return new DefaultServerNetwork<>(networkConfig, channelToConnection); @@ -65,7 +65,7 @@ public static ClientNetwork stringDataClientNetwork( * Create id based packet default asynchronous client network. */ public static ClientNetwork defaultClientNetwork( - ReadableNetworkPacketRegistry packetRegistry) { + ReadableNetworkPacketRegistry, DefaultConnection> packetRegistry) { return defaultClientNetwork( NetworkConfig.DEFAULT_CLIENT, new DefaultBufferAllocator(NetworkConfig.DEFAULT_CLIENT), @@ -78,8 +78,8 @@ public static ClientNetwork defaultClientNetwork( public static ClientNetwork defaultClientNetwork( NetworkConfig networkConfig, BufferAllocator bufferAllocator, - ReadableNetworkPacketRegistry packetRegistry) { - return clientNetwork( + ReadableNetworkPacketRegistry, DefaultConnection> packetRegistry) { + return NetworkFactory.clientNetwork( networkConfig, (network, channel) -> new DefaultConnection(network, channel, bufferAllocator, packetRegistry)); } @@ -139,7 +139,7 @@ public static ServerNetwork stringDataSslServerNetwork( * Create id based packet default asynchronous server network. */ public static ServerNetwork defaultServerNetwork( - ReadableNetworkPacketRegistry packetRegistry) { + ReadableNetworkPacketRegistry, DefaultConnection> packetRegistry) { return defaultServerNetwork( ServerNetworkConfig.DEFAULT_SERVER, new DefaultBufferAllocator(ServerNetworkConfig.DEFAULT_SERVER), @@ -152,7 +152,7 @@ public static ServerNetwork defaultServerNetwork( public static ServerNetwork defaultServerNetwork( ServerNetworkConfig networkConfig, BufferAllocator bufferAllocator, - ReadableNetworkPacketRegistry packetRegistry) { + ReadableNetworkPacketRegistry, DefaultConnection> packetRegistry) { return serverNetwork( networkConfig, (network, channel) -> new DefaultConnection(network, channel, bufferAllocator, packetRegistry)); diff --git a/rlib-network/src/main/java/javasabr/rlib/network/ServerNetworkConfig.java b/rlib-network/src/main/java/javasabr/rlib/network/ServerNetworkConfig.java index 2a3711c0..36855877 100644 --- a/rlib-network/src/main/java/javasabr/rlib/network/ServerNetworkConfig.java +++ b/rlib-network/src/main/java/javasabr/rlib/network/ServerNetworkConfig.java @@ -32,7 +32,13 @@ class SimpleServerNetworkConfig implements ServerNetworkConfig { @Builder.Default private int writeBufferSize = 2048; @Builder.Default - private int threadGroupSize = 1; + private int retryDelayInMs = 1000; + @Builder.Default + private int threadGroupMinSize = 1; + @Builder.Default + private int threadGroupMaxSize = 1; + @Builder.Default + private int scheduledThreadGroupSize = 1; @Builder.Default private int threadPriority = Thread.NORM_PRIORITY; } @@ -40,13 +46,13 @@ class SimpleServerNetworkConfig implements ServerNetworkConfig { ServerNetworkConfig DEFAULT_SERVER = new ServerNetworkConfig() { @Override - public int threadGroupMinSize() { - return 1; + public String threadGroupName() { + return "ServerNetworkThread"; } @Override - public String threadGroupName() { - return "ServerNetworkThread"; + public String scheduledThreadGroupName() { + return "ServerScheduledNetworkThread"; } }; @@ -64,6 +70,13 @@ default int threadGroupMaxSize() { return threadGroupMinSize(); } + /** + * Get a size of network scheduled thread executor. + */ + default int scheduledThreadGroupSize() { + return 1; + } + /** * Get a thread constructor which should be used to create network threads. */ diff --git a/rlib-network/src/main/java/javasabr/rlib/network/UnsafeConnection.java b/rlib-network/src/main/java/javasabr/rlib/network/UnsafeConnection.java index 7447fc97..2f8108ff 100644 --- a/rlib-network/src/main/java/javasabr/rlib/network/UnsafeConnection.java +++ b/rlib-network/src/main/java/javasabr/rlib/network/UnsafeConnection.java @@ -1,10 +1,8 @@ package javasabr.rlib.network; -import javasabr.rlib.network.packet.ReadableNetworkPacket; -import javasabr.rlib.network.packet.WritableNetworkPacket; +public interface UnsafeConnection> extends Connection { -public interface UnsafeConnection - extends Connection { + Network network(); void onConnected(); } diff --git a/rlib-network/src/main/java/javasabr/rlib/network/client/ClientNetwork.java b/rlib-network/src/main/java/javasabr/rlib/network/client/ClientNetwork.java index e8a4b7ed..586c43ef 100644 --- a/rlib-network/src/main/java/javasabr/rlib/network/client/ClientNetwork.java +++ b/rlib-network/src/main/java/javasabr/rlib/network/client/ClientNetwork.java @@ -13,7 +13,7 @@ * * @author JavaSaBr */ -public interface ClientNetwork> extends Network { +public interface ClientNetwork> extends Network { /** * Connect to a server by the address. diff --git a/rlib-network/src/main/java/javasabr/rlib/network/client/impl/DefaultClientNetwork.java b/rlib-network/src/main/java/javasabr/rlib/network/client/impl/DefaultClientNetwork.java index b682fed9..a39d2b96 100644 --- a/rlib-network/src/main/java/javasabr/rlib/network/client/impl/DefaultClientNetwork.java +++ b/rlib-network/src/main/java/javasabr/rlib/network/client/impl/DefaultClientNetwork.java @@ -6,9 +6,12 @@ import java.nio.channels.CompletionHandler; import java.util.Optional; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.BiFunction; import javasabr.rlib.common.util.AsyncUtils; +import javasabr.rlib.common.util.GroupThreadFactory; import javasabr.rlib.common.util.ThreadUtils; import javasabr.rlib.common.util.Utils; import javasabr.rlib.network.Connection; @@ -33,10 +36,14 @@ @CustomLog @Accessors(fluent = true, chain = false) @FieldDefaults(level = AccessLevel.PROTECTED) -public class DefaultClientNetwork> extends AbstractNetwork implements ClientNetwork { +public class DefaultClientNetwork> extends AbstractNetwork + implements ClientNetwork { final AtomicBoolean connecting; + @Getter + final ScheduledExecutorService scheduledExecutor; + @Nullable @Getter(AccessLevel.PROTECTED) volatile CompletableFuture pendingConnection; @@ -50,6 +57,8 @@ public DefaultClientNetwork( BiFunction, AsynchronousSocketChannel, C> channelToConnection) { super(config, channelToConnection); this.connecting = new AtomicBoolean(false); + this.scheduledExecutor = Executors + .newSingleThreadScheduledExecutor(new GroupThreadFactory(config.scheduledThreadGroupName())); log.info(config, DefaultClientNetwork::buildConfigDescription); } @@ -84,7 +93,7 @@ public CompletableFuture connectAsync(InetSocketAddress serverAddress) { @Override public void completed(@Nullable Void result, DefaultClientNetwork network) { SocketAddress remoteAddress = NetworkUtils.getRemoteAddress(channel); - log.info(remoteAddress, "Connected to server:[%s]"::formatted); + log.info(remoteAddress, "[%s] Connected to server."::formatted); asyncResult.complete(channelToConnection.apply(network, channel)); } diff --git a/rlib-network/src/main/java/javasabr/rlib/network/impl/AbstractConnection.java b/rlib-network/src/main/java/javasabr/rlib/network/impl/AbstractConnection.java index 26c78c55..b5615cf5 100644 --- a/rlib-network/src/main/java/javasabr/rlib/network/impl/AbstractConnection.java +++ b/rlib-network/src/main/java/javasabr/rlib/network/impl/AbstractConnection.java @@ -39,30 +39,30 @@ @CustomLog @Accessors(fluent = true, chain = false) @FieldDefaults(level = AccessLevel.PROTECTED) -public abstract class AbstractConnection implements - UnsafeConnection { +public abstract class AbstractConnection> implements UnsafeConnection { - private static class WritablePacketWithFeedback extends - WritablePacketWrapper, W> { + private static class WritablePacketWithFeedback> + extends WritablePacketWrapper, C> { - public WritablePacketWithFeedback(CompletableFuture attachment, W packet) { + public WritablePacketWithFeedback(CompletableFuture attachment, WritableNetworkPacket packet) { super(attachment, packet); } } @Getter final String remoteAddress; + @Getter + final Network network; - final Network> network; final BufferAllocator bufferAllocator; final AsynchronousSocketChannel channel; - final Deque pendingPackets; + final Deque> pendingPackets; final StampedLock lock; final AtomicBoolean isWriting; final AtomicBoolean closed; - final MutableArray, ? super R>> subscribers; + final MutableArray>> subscribers; final int maxPacketsByRead; @@ -70,7 +70,7 @@ public WritablePacketWithFeedback(CompletableFuture attachment, W packe volatile long lastActivity; public AbstractConnection( - Network> network, + Network network, AsynchronousSocketChannel channel, BufferAllocator bufferAllocator, int maxPacketsByRead) { @@ -94,25 +94,25 @@ public void onConnected() {} protected abstract NetworkPacketWriter packetWriter(); @Override - public void onReceive(BiConsumer, ? super R> consumer) { + public void onReceive(BiConsumer> consumer) { subscribers.add(consumer); packetReader().startRead(); } @Override - public Flux, ? extends R>> receivedEvents() { + public Flux>> receivedEvents() { return Flux.create(this::registerFluxOnReceivedEvents); } @Override - public Flux receivedPackets() { + public Flux> receivedPackets() { return Flux.create(this::registerFluxOnReceivedPackets); } protected void registerFluxOnReceivedEvents( - FluxSink, ? extends R>> sink) { + FluxSink>> sink) { - BiConsumer, R> listener = + BiConsumer> listener = (connection, packet) -> sink.next(new ReceivedPacketEvent<>(connection, packet)); @@ -121,14 +121,14 @@ protected void registerFluxOnReceivedEvents( sink.onDispose(() -> subscribers.remove(listener)); } - protected void registerFluxOnReceivedPackets(FluxSink sink) { - BiConsumer, R> listener = (connection, packet) -> sink.next(packet); + protected void registerFluxOnReceivedPackets(FluxSink> sink) { + BiConsumer> listener = (connection, packet) -> sink.next(packet); onReceive(listener); sink.onDispose(() -> subscribers.remove(listener)); } @Nullable - protected WritableNetworkPacket nextPacketToWrite() { + protected WritableNetworkPacket nextPacketToWrite() { long stamp = lock.writeLock(); try { return pendingPackets.poll(); @@ -168,29 +168,29 @@ public boolean closed() { return closed.get(); } - protected void serializedPacket(WritableNetworkPacket packet) {} + protected void serializedPacket(WritableNetworkPacket packet) {} - protected void handleReceivedPacket(R packet) { + protected void handleReceivedPacket(ReadableNetworkPacket packet) { log.debug(packet, remoteAddress, "Handle received packet:[%s] from:[%s]"::formatted); subscribers .iterations() - .forEach(this, packet, BiConsumer::accept); + .forEach((C) this, packet, BiConsumer::accept); } - protected void handleSentPacket(WritableNetworkPacket packet, boolean result) { - if (packet instanceof WritablePacketWithFeedback) { - ((WritablePacketWithFeedback) packet) + protected void handleSentPacket(WritableNetworkPacket packet, boolean result) { + if (packet instanceof WritablePacketWithFeedback withFeedback) { + withFeedback .getAttachment() .complete(result); } } @Override - public final void send(W packet) { + public final void send(WritableNetworkPacket packet) { sendImpl(packet); } - protected void sendImpl(WritableNetworkPacket packet) { + protected void sendImpl(WritableNetworkPacket packet) { if (closed()) { return; @@ -206,7 +206,7 @@ protected void sendImpl(WritableNetworkPacket packet) { packetWriter().tryToSendNextPacket(); } - protected void queueAtFirst(WritableNetworkPacket packet) { + protected void queueAtFirst(WritableNetworkPacket packet) { long stamp = lock.writeLock(); try { pendingPackets.addFirst(packet); @@ -216,7 +216,7 @@ protected void queueAtFirst(WritableNetworkPacket packet) { } @Override - public CompletableFuture sendWithFeedback(W packet) { + public CompletableFuture sendWithFeedback(WritableNetworkPacket packet) { var asyncResult = new CompletableFuture(); diff --git a/rlib-network/src/main/java/javasabr/rlib/network/impl/AbstractNetwork.java b/rlib-network/src/main/java/javasabr/rlib/network/impl/AbstractNetwork.java index 592fc997..183306c3 100644 --- a/rlib-network/src/main/java/javasabr/rlib/network/impl/AbstractNetwork.java +++ b/rlib-network/src/main/java/javasabr/rlib/network/impl/AbstractNetwork.java @@ -7,7 +7,9 @@ import javasabr.rlib.network.NetworkConfig; import lombok.AccessLevel; import lombok.CustomLog; +import lombok.Getter; import lombok.RequiredArgsConstructor; +import lombok.experimental.Accessors; import lombok.experimental.FieldDefaults; /** @@ -17,8 +19,11 @@ */ @CustomLog @RequiredArgsConstructor +@Accessors(fluent = true, chain = false) @FieldDefaults(level = AccessLevel.PROTECTED, makeFinal = true) -public abstract class AbstractNetwork> implements Network { +public abstract class AbstractNetwork> implements Network { + + @Getter NetworkConfig config; BiFunction, AsynchronousSocketChannel, C> channelToConnection; } diff --git a/rlib-network/src/main/java/javasabr/rlib/network/impl/AbstractSslConnection.java b/rlib-network/src/main/java/javasabr/rlib/network/impl/AbstractSslConnection.java index d88aab8b..9ae853d7 100644 --- a/rlib-network/src/main/java/javasabr/rlib/network/impl/AbstractSslConnection.java +++ b/rlib-network/src/main/java/javasabr/rlib/network/impl/AbstractSslConnection.java @@ -2,9 +2,7 @@ import java.nio.channels.AsynchronousSocketChannel; import javasabr.rlib.network.BufferAllocator; -import javasabr.rlib.network.Connection; import javasabr.rlib.network.Network; -import javasabr.rlib.network.packet.ReadableNetworkPacket; import javasabr.rlib.network.packet.WritableNetworkPacket; import javax.net.ssl.SSLContext; import javax.net.ssl.SSLEngine; @@ -13,13 +11,13 @@ import lombok.experimental.FieldDefaults; @FieldDefaults(level = AccessLevel.PROTECTED) -public abstract class AbstractSslConnection - extends AbstractConnection { +public abstract class AbstractSslConnection> + extends AbstractConnection { final SSLEngine sslEngine; public AbstractSslConnection( - Network> network, + Network network, AsynchronousSocketChannel channel, BufferAllocator bufferAllocator, SSLContext sslContext, @@ -36,7 +34,7 @@ public AbstractSslConnection( } @Override - protected void sendImpl(WritableNetworkPacket packet) { + protected void sendImpl(WritableNetworkPacket packet) { super.sendImpl(packet); packetReader().startRead(); } diff --git a/rlib-network/src/main/java/javasabr/rlib/network/impl/DefaultBufferAllocator.java b/rlib-network/src/main/java/javasabr/rlib/network/impl/DefaultBufferAllocator.java index a23fe361..6efa6496 100644 --- a/rlib-network/src/main/java/javasabr/rlib/network/impl/DefaultBufferAllocator.java +++ b/rlib-network/src/main/java/javasabr/rlib/network/impl/DefaultBufferAllocator.java @@ -3,8 +3,6 @@ import java.nio.ByteBuffer; import javasabr.rlib.network.BufferAllocator; import javasabr.rlib.network.NetworkConfig; -import javasabr.rlib.reusable.pool.Pool; -import javasabr.rlib.reusable.pool.PoolFactory; import lombok.AccessLevel; import lombok.CustomLog; import lombok.ToString; @@ -20,24 +18,17 @@ @FieldDefaults(level = AccessLevel.PROTECTED, makeFinal = true) public class DefaultBufferAllocator implements BufferAllocator { - final Pool readBufferPool; - final Pool pendingBufferPool; - final Pool writeBufferPool; - final NetworkConfig config; public DefaultBufferAllocator(NetworkConfig config) { this.config = config; - this.readBufferPool = PoolFactory.newLockBasePool(ByteBuffer.class); - this.pendingBufferPool = PoolFactory.newLockBasePool(ByteBuffer.class); - this.writeBufferPool = PoolFactory.newLockBasePool(ByteBuffer.class); } @Override public ByteBuffer takeReadBuffer() { int bufferSize = config.readBufferSize(); log.debug(bufferSize, "Allocate new read buffer with size:[%s]"::formatted); - return config.isDirectByteBuffer() + return config.useDirectByteBuffer() ? ByteBuffer.allocateDirect(bufferSize) : ByteBuffer .allocate(bufferSize) @@ -49,7 +40,7 @@ public ByteBuffer takeReadBuffer() { public ByteBuffer takePendingBuffer() { int bufferSize = config.pendingBufferSize(); log.debug(bufferSize, "Allocate new pending buffer with size:[%s]"::formatted); - return config.isDirectByteBuffer() + return config.useDirectByteBuffer() ? ByteBuffer.allocateDirect(bufferSize) : ByteBuffer .allocate(bufferSize) @@ -61,7 +52,7 @@ public ByteBuffer takePendingBuffer() { public ByteBuffer takeWriteBuffer() { int bufferSize = config.writeBufferSize(); log.debug(bufferSize, "Allocate new write buffer with size:[%s]"::formatted); - return config.isDirectByteBuffer() + return config.useDirectByteBuffer() ? ByteBuffer.allocateDirect(bufferSize) : ByteBuffer .allocate(bufferSize) @@ -72,7 +63,7 @@ public ByteBuffer takeWriteBuffer() { @Override public ByteBuffer takeBuffer(int bufferSize) { log.debug(bufferSize, "Allocate new buffer with size:[%s]"::formatted); - return config.isDirectByteBuffer() + return config.useDirectByteBuffer() ? ByteBuffer.allocateDirect(bufferSize) : ByteBuffer .allocate(bufferSize) diff --git a/rlib-network/src/main/java/javasabr/rlib/network/impl/DefaultConnection.java b/rlib-network/src/main/java/javasabr/rlib/network/impl/DefaultConnection.java index 26a404ad..c229f80c 100644 --- a/rlib-network/src/main/java/javasabr/rlib/network/impl/DefaultConnection.java +++ b/rlib-network/src/main/java/javasabr/rlib/network/impl/DefaultConnection.java @@ -2,23 +2,20 @@ import java.nio.channels.AsynchronousSocketChannel; import javasabr.rlib.network.BufferAllocator; -import javasabr.rlib.network.Connection; import javasabr.rlib.network.Network; -import javasabr.rlib.network.packet.impl.DefaultReadableNetworkPacket; -import javasabr.rlib.network.packet.impl.DefaultWritableNetworkPacket; +import javasabr.rlib.network.packet.IdBasedReadableNetworkPacket; import javasabr.rlib.network.packet.registry.ReadableNetworkPacketRegistry; /** * @author JavaSaBr */ -public class DefaultConnection extends - IdBasedPacketConnection { +public class DefaultConnection extends IdBasedPacketConnection { public DefaultConnection( - Network> network, + Network network, AsynchronousSocketChannel channel, BufferAllocator bufferAllocator, - ReadableNetworkPacketRegistry packetRegistry) { + ReadableNetworkPacketRegistry, DefaultConnection> packetRegistry) { super(network, channel, bufferAllocator, packetRegistry, 100, 2, 2); } } diff --git a/rlib-network/src/main/java/javasabr/rlib/network/impl/DefaultDataConnection.java b/rlib-network/src/main/java/javasabr/rlib/network/impl/DefaultDataConnection.java index 8d6667cc..e15bd66d 100644 --- a/rlib-network/src/main/java/javasabr/rlib/network/impl/DefaultDataConnection.java +++ b/rlib-network/src/main/java/javasabr/rlib/network/impl/DefaultDataConnection.java @@ -2,12 +2,10 @@ import java.nio.channels.AsynchronousSocketChannel; import javasabr.rlib.network.BufferAllocator; -import javasabr.rlib.network.Connection; import javasabr.rlib.network.Network; import javasabr.rlib.network.packet.NetworkPacketReader; import javasabr.rlib.network.packet.NetworkPacketWriter; import javasabr.rlib.network.packet.ReadableNetworkPacket; -import javasabr.rlib.network.packet.WritableNetworkPacket; import javasabr.rlib.network.packet.impl.DefaultNetworkPacketReader; import javasabr.rlib.network.packet.impl.DefaultNetworkPacketWriter; import lombok.AccessLevel; @@ -21,8 +19,7 @@ @Getter(AccessLevel.PROTECTED) @Accessors(fluent = true, chain = false) @FieldDefaults(level = AccessLevel.PROTECTED) -public abstract class DefaultDataConnection - extends AbstractConnection { +public abstract class DefaultDataConnection> extends AbstractConnection { final NetworkPacketReader packetReader; final NetworkPacketWriter packetWriter; @@ -30,7 +27,7 @@ public abstract class DefaultDataConnection> network, + Network network, AsynchronousSocketChannel channel, BufferAllocator bufferAllocator, int maxPacketsByRead, @@ -43,7 +40,7 @@ public DefaultDataConnection( protected NetworkPacketReader createPacketReader() { return new DefaultNetworkPacketReader<>( - this, + (C) this, channel, bufferAllocator, this::updateLastActivity, @@ -54,8 +51,8 @@ protected NetworkPacketReader createPacketReader() { } protected NetworkPacketWriter createPacketWriter() { - return new DefaultNetworkPacketWriter>( - this, + return new DefaultNetworkPacketWriter<>( + (C) this, channel, bufferAllocator, this::updateLastActivity, @@ -65,5 +62,5 @@ protected NetworkPacketWriter createPacketWriter() { packetLengthHeaderSize); } - protected abstract R createReadablePacket(); + protected abstract ReadableNetworkPacket createReadablePacket(); } diff --git a/rlib-network/src/main/java/javasabr/rlib/network/impl/DefaultDataSslConnection.java b/rlib-network/src/main/java/javasabr/rlib/network/impl/DefaultDataSslConnection.java index 2708e94a..30ac62be 100644 --- a/rlib-network/src/main/java/javasabr/rlib/network/impl/DefaultDataSslConnection.java +++ b/rlib-network/src/main/java/javasabr/rlib/network/impl/DefaultDataSslConnection.java @@ -2,12 +2,10 @@ import java.nio.channels.AsynchronousSocketChannel; import javasabr.rlib.network.BufferAllocator; -import javasabr.rlib.network.Connection; import javasabr.rlib.network.Network; import javasabr.rlib.network.packet.NetworkPacketReader; import javasabr.rlib.network.packet.NetworkPacketWriter; import javasabr.rlib.network.packet.ReadableNetworkPacket; -import javasabr.rlib.network.packet.WritableNetworkPacket; import javasabr.rlib.network.packet.impl.DefaultSslNetworkPacketReader; import javasabr.rlib.network.packet.impl.DefaultSslNetworkPacketWriter; import javax.net.ssl.SSLContext; @@ -22,8 +20,7 @@ @Getter(AccessLevel.PROTECTED) @Accessors(fluent = true, chain = false) @FieldDefaults(level = AccessLevel.PROTECTED) -public abstract class DefaultDataSslConnection - extends AbstractSslConnection { +public abstract class DefaultDataSslConnection> extends AbstractSslConnection { final NetworkPacketReader packetReader; final NetworkPacketWriter packetWriter; @@ -31,7 +28,7 @@ public abstract class DefaultDataSslConnection> network, + Network network, AsynchronousSocketChannel channel, BufferAllocator bufferAllocator, SSLContext sslContext, @@ -46,7 +43,7 @@ public DefaultDataSslConnection( protected NetworkPacketReader createPacketReader() { return new DefaultSslNetworkPacketReader<>( - this, + (C) this, channel, bufferAllocator, this::updateLastActivity, @@ -59,8 +56,8 @@ protected NetworkPacketReader createPacketReader() { } protected NetworkPacketWriter createPacketWriter() { - return new DefaultSslNetworkPacketWriter>( - this, + return new DefaultSslNetworkPacketWriter<>( + (C) this, channel, bufferAllocator, this::updateLastActivity, @@ -72,5 +69,5 @@ protected NetworkPacketWriter createPacketWriter() { packetLengthHeaderSize); } - protected abstract R createReadablePacket(); + protected abstract ReadableNetworkPacket createReadablePacket(); } diff --git a/rlib-network/src/main/java/javasabr/rlib/network/impl/IdBasedPacketConnection.java b/rlib-network/src/main/java/javasabr/rlib/network/impl/IdBasedPacketConnection.java index d71d528a..31ec2de4 100644 --- a/rlib-network/src/main/java/javasabr/rlib/network/impl/IdBasedPacketConnection.java +++ b/rlib-network/src/main/java/javasabr/rlib/network/impl/IdBasedPacketConnection.java @@ -2,13 +2,11 @@ import java.nio.channels.AsynchronousSocketChannel; import javasabr.rlib.network.BufferAllocator; -import javasabr.rlib.network.Connection; import javasabr.rlib.network.Network; import javasabr.rlib.network.packet.IdBasedReadableNetworkPacket; -import javasabr.rlib.network.packet.IdBasedWritableNetworkPacket; import javasabr.rlib.network.packet.NetworkPacketReader; import javasabr.rlib.network.packet.NetworkPacketWriter; -import javasabr.rlib.network.packet.impl.IdBasedPacketReader; +import javasabr.rlib.network.packet.impl.IdBasedNetworkPacketReader; import javasabr.rlib.network.packet.impl.IdBasedNetworkPacketWriter; import javasabr.rlib.network.packet.registry.ReadableNetworkPacketRegistry; import lombok.AccessLevel; @@ -22,21 +20,20 @@ @Getter(AccessLevel.PROTECTED) @Accessors(fluent = true, chain = false) @FieldDefaults(level = AccessLevel.PROTECTED) -public class IdBasedPacketConnection, W extends IdBasedWritableNetworkPacket> - extends AbstractConnection { +public class IdBasedPacketConnection> extends AbstractConnection { final NetworkPacketReader packetReader; final NetworkPacketWriter packetWriter; - final ReadableNetworkPacketRegistry packetRegistry; + final ReadableNetworkPacketRegistry, C> packetRegistry; final int packetLengthHeaderSize; final int packetIdHeaderSize; public IdBasedPacketConnection( - Network> network, + Network network, AsynchronousSocketChannel channel, BufferAllocator bufferAllocator, - ReadableNetworkPacketRegistry packetRegistry, + ReadableNetworkPacketRegistry, C> packetRegistry, int maxPacketsByRead, int packetLengthHeaderSize, int packetIdHeaderSize) { @@ -49,8 +46,8 @@ public IdBasedPacketConnection( } protected NetworkPacketReader createPacketReader() { - return new IdBasedPacketReader<>( - this, + return new IdBasedNetworkPacketReader<>( + (C) this, channel, bufferAllocator, this::updateLastActivity, @@ -63,11 +60,11 @@ protected NetworkPacketReader createPacketReader() { protected NetworkPacketWriter createPacketWriter() { return new IdBasedNetworkPacketWriter<>( - this, + (C) this, channel, bufferAllocator, this::updateLastActivity, - this::nextPacketToWrite, + () -> nextPacketToWrite(), this::serializedPacket, this::handleSentPacket, packetLengthHeaderSize, diff --git a/rlib-network/src/main/java/javasabr/rlib/network/impl/PackedIdBasedConnection.java b/rlib-network/src/main/java/javasabr/rlib/network/impl/PackedIdBasedConnection.java index 9589f06e..d172d847 100644 --- a/rlib-network/src/main/java/javasabr/rlib/network/impl/PackedIdBasedConnection.java +++ b/rlib-network/src/main/java/javasabr/rlib/network/impl/PackedIdBasedConnection.java @@ -7,7 +7,7 @@ * * @author JavaSaBr */ -public interface PackedIdBasedConnection extends Connection { +public interface PackedIdBasedConnection> extends Connection { /** * Get length of packet's header with packet's data length. diff --git a/rlib-network/src/main/java/javasabr/rlib/network/impl/ReuseBufferAllocator.java b/rlib-network/src/main/java/javasabr/rlib/network/impl/ReuseBufferAllocator.java index fa771afa..ad71c4b3 100644 --- a/rlib-network/src/main/java/javasabr/rlib/network/impl/ReuseBufferAllocator.java +++ b/rlib-network/src/main/java/javasabr/rlib/network/impl/ReuseBufferAllocator.java @@ -58,7 +58,7 @@ protected Function pendingBufferFactory() { return config -> { var bufferSize = config.pendingBufferSize(); log.debug(bufferSize, "Allocate new pending buffer with size:[%s]"::formatted); - return config.isDirectByteBuffer() + return config.useDirectByteBuffer() ? ByteBuffer.allocateDirect(bufferSize) : ByteBuffer .allocate(bufferSize) @@ -71,7 +71,7 @@ protected Function readBufferFactory() { return config -> { var bufferSize = config.readBufferSize(); log.debug(bufferSize, "Allocate new read buffer with size:[%s]"::formatted); - return config.isDirectByteBuffer() + return config.useDirectByteBuffer() ? ByteBuffer.allocateDirect(bufferSize) : ByteBuffer .allocate(bufferSize) @@ -84,7 +84,7 @@ protected Function writeBufferFactory() { return config -> { var bufferSize = config.writeBufferSize(); log.debug(bufferSize, "Allocate new write buffer with size:[%s]"::formatted); - return config.isDirectByteBuffer() + return config.useDirectByteBuffer() ? ByteBuffer.allocateDirect(bufferSize) : ByteBuffer .allocate(bufferSize) @@ -130,7 +130,7 @@ public ByteBuffer takeBuffer(int bufferSize) { } log.debug(bufferSize, "Allocate a new buffer with size:[%s]"::formatted); - return config.isDirectByteBuffer() + return config.useDirectByteBuffer() ? ByteBuffer.allocateDirect(bufferSize) : ByteBuffer .allocate(bufferSize) diff --git a/rlib-network/src/main/java/javasabr/rlib/network/impl/StringDataConnection.java b/rlib-network/src/main/java/javasabr/rlib/network/impl/StringDataConnection.java index 52d61ea9..b7a3addd 100644 --- a/rlib-network/src/main/java/javasabr/rlib/network/impl/StringDataConnection.java +++ b/rlib-network/src/main/java/javasabr/rlib/network/impl/StringDataConnection.java @@ -2,25 +2,23 @@ import java.nio.channels.AsynchronousSocketChannel; import javasabr.rlib.network.BufferAllocator; -import javasabr.rlib.network.Connection; import javasabr.rlib.network.Network; -import javasabr.rlib.network.packet.impl.StringReadablePacket; -import javasabr.rlib.network.packet.impl.StringWritableNetworkPacket; +import javasabr.rlib.network.packet.impl.StringReadableNetworkPacket; /** * @author JavaSaBr */ -public class StringDataConnection extends DefaultDataConnection { +public class StringDataConnection extends DefaultDataConnection { public StringDataConnection( - Network> network, + Network network, AsynchronousSocketChannel channel, BufferAllocator bufferAllocator) { super(network, channel, bufferAllocator, 100, 2); } @Override - protected StringReadablePacket createReadablePacket() { - return new StringReadablePacket(); + protected StringReadableNetworkPacket createReadablePacket() { + return new StringReadableNetworkPacket<>(); } } diff --git a/rlib-network/src/main/java/javasabr/rlib/network/impl/StringDataSslConnection.java b/rlib-network/src/main/java/javasabr/rlib/network/impl/StringDataSslConnection.java index c97363da..a5de0afd 100644 --- a/rlib-network/src/main/java/javasabr/rlib/network/impl/StringDataSslConnection.java +++ b/rlib-network/src/main/java/javasabr/rlib/network/impl/StringDataSslConnection.java @@ -2,20 +2,17 @@ import java.nio.channels.AsynchronousSocketChannel; import javasabr.rlib.network.BufferAllocator; -import javasabr.rlib.network.Connection; import javasabr.rlib.network.Network; -import javasabr.rlib.network.packet.impl.StringReadablePacket; -import javasabr.rlib.network.packet.impl.StringWritableNetworkPacket; +import javasabr.rlib.network.packet.impl.StringReadableNetworkPacket; import javax.net.ssl.SSLContext; /** * @author JavaSaBr */ -public class StringDataSslConnection extends - DefaultDataSslConnection { +public class StringDataSslConnection extends DefaultDataSslConnection { public StringDataSslConnection( - Network> network, + Network network, AsynchronousSocketChannel channel, BufferAllocator bufferAllocator, SSLContext sslContext, @@ -24,7 +21,7 @@ public StringDataSslConnection( } @Override - protected StringReadablePacket createReadablePacket() { - return new StringReadablePacket(); + protected StringReadableNetworkPacket createReadablePacket() { + return new StringReadableNetworkPacket<>(); } } diff --git a/rlib-network/src/main/java/javasabr/rlib/network/packet/IdBasedNetworkPacket.java b/rlib-network/src/main/java/javasabr/rlib/network/packet/IdBasedNetworkPacket.java index cf32dfd3..9f75c928 100644 --- a/rlib-network/src/main/java/javasabr/rlib/network/packet/IdBasedNetworkPacket.java +++ b/rlib-network/src/main/java/javasabr/rlib/network/packet/IdBasedNetworkPacket.java @@ -1,11 +1,12 @@ package javasabr.rlib.network.packet; +import javasabr.rlib.network.Connection; import javasabr.rlib.network.annotation.NetworkPacketDescription; /** * @author JavaSaBr */ -public interface IdBasedNetworkPacket extends NetworkPacket { +public interface IdBasedNetworkPacket> extends NetworkPacket { /** * Get id of this packet. diff --git a/rlib-network/src/main/java/javasabr/rlib/network/packet/IdBasedReadableNetworkPacket.java b/rlib-network/src/main/java/javasabr/rlib/network/packet/IdBasedReadableNetworkPacket.java index 9f3fa344..9664624d 100644 --- a/rlib-network/src/main/java/javasabr/rlib/network/packet/IdBasedReadableNetworkPacket.java +++ b/rlib-network/src/main/java/javasabr/rlib/network/packet/IdBasedReadableNetworkPacket.java @@ -6,13 +6,13 @@ /** * @author JavaSaBr */ -public interface IdBasedReadableNetworkPacket> - extends ReadableNetworkPacket, IdBasedNetworkPacket { +public interface IdBasedReadableNetworkPacket> + extends ReadableNetworkPacket, IdBasedNetworkPacket { /** * Create a new instance of this type. */ - default S newInstance() { + default IdBasedReadableNetworkPacket newInstance() { return ClassUtils.newInstance(getClass()); } } diff --git a/rlib-network/src/main/java/javasabr/rlib/network/packet/IdBasedWritableNetworkPacket.java b/rlib-network/src/main/java/javasabr/rlib/network/packet/IdBasedWritableNetworkPacket.java index d157e6eb..d06c6bee 100644 --- a/rlib-network/src/main/java/javasabr/rlib/network/packet/IdBasedWritableNetworkPacket.java +++ b/rlib-network/src/main/java/javasabr/rlib/network/packet/IdBasedWritableNetworkPacket.java @@ -1,7 +1,10 @@ package javasabr.rlib.network.packet; +import javasabr.rlib.network.Connection; + /** * @author JavaSaBr */ -public interface IdBasedWritableNetworkPacket extends WritableNetworkPacket, IdBasedNetworkPacket { +public interface IdBasedWritableNetworkPacket> + extends WritableNetworkPacket, IdBasedNetworkPacket { } diff --git a/rlib-network/src/main/java/javasabr/rlib/network/packet/NetworkPacket.java b/rlib-network/src/main/java/javasabr/rlib/network/packet/NetworkPacket.java index 5ee76d47..037500e7 100644 --- a/rlib-network/src/main/java/javasabr/rlib/network/packet/NetworkPacket.java +++ b/rlib-network/src/main/java/javasabr/rlib/network/packet/NetworkPacket.java @@ -1,11 +1,13 @@ package javasabr.rlib.network.packet; +import javasabr.rlib.network.Connection; + /** * The interface to implement a network packet. * * @author JavaSaBr */ -public interface NetworkPacket { +public interface NetworkPacket> { /** * @return the packet's name. diff --git a/rlib-network/src/main/java/javasabr/rlib/network/packet/ReadableNetworkPacket.java b/rlib-network/src/main/java/javasabr/rlib/network/packet/ReadableNetworkPacket.java index 9438e58a..72407a02 100644 --- a/rlib-network/src/main/java/javasabr/rlib/network/packet/ReadableNetworkPacket.java +++ b/rlib-network/src/main/java/javasabr/rlib/network/packet/ReadableNetworkPacket.java @@ -1,13 +1,14 @@ package javasabr.rlib.network.packet; import java.nio.ByteBuffer; +import javasabr.rlib.network.Connection; /** * The interface to implement a readable network packet. * * @author JavaSaBr */ -public interface ReadableNetworkPacket extends NetworkPacket { +public interface ReadableNetworkPacket> extends NetworkPacket { /** * Read packet's data from byte buffer. @@ -16,5 +17,5 @@ public interface ReadableNetworkPacket extends NetworkPacket { * @param remainingDataLength the expected remaining data length. * @return true if reading was success. */ - boolean read(ByteBuffer buffer, int remainingDataLength); + boolean read(C connection, ByteBuffer buffer, int remainingDataLength); } diff --git a/rlib-network/src/main/java/javasabr/rlib/network/packet/ReusableWritablePacket.java b/rlib-network/src/main/java/javasabr/rlib/network/packet/ReusableWritablePacket.java index bfbc96ba..ccbd5262 100644 --- a/rlib-network/src/main/java/javasabr/rlib/network/packet/ReusableWritablePacket.java +++ b/rlib-network/src/main/java/javasabr/rlib/network/packet/ReusableWritablePacket.java @@ -1,5 +1,6 @@ package javasabr.rlib.network.packet; +import javasabr.rlib.network.Connection; import javasabr.rlib.reusable.Reusable; import javasabr.rlib.reusable.pool.Pool; @@ -8,7 +9,7 @@ * * @author JavaSaBr */ -public interface ReusableWritablePacket extends WritableNetworkPacket, Reusable { +public interface ReusableWritablePacket> extends WritableNetworkPacket, Reusable { /** * Handle completion of packet sending. @@ -49,7 +50,7 @@ public interface ReusableWritablePacket extends WritableNetworkPacket, Reusable * * @param pool the pool to store used packet. */ - void setPool(Pool pool); + void setPool(Pool> pool); default void notifyAddedToSend() { increaseSends(); diff --git a/rlib-network/src/main/java/javasabr/rlib/network/packet/WritableNetworkPacket.java b/rlib-network/src/main/java/javasabr/rlib/network/packet/WritableNetworkPacket.java index 91b997b6..1a9d149e 100644 --- a/rlib-network/src/main/java/javasabr/rlib/network/packet/WritableNetworkPacket.java +++ b/rlib-network/src/main/java/javasabr/rlib/network/packet/WritableNetworkPacket.java @@ -1,26 +1,28 @@ package javasabr.rlib.network.packet; import java.nio.ByteBuffer; - +import javasabr.rlib.network.Connection; /** * Interface to implement a writable packet. * * @author JavaSaBr */ -public interface WritableNetworkPacket extends NetworkPacket { +public interface WritableNetworkPacket> extends NetworkPacket { + + int UNKNOWN_EXPECTED_BYTES = -1; /** * Write this packet to the buffer. * * @return true if writing was successful. */ - boolean write(ByteBuffer buffer); + boolean write(C connection, ByteBuffer buffer); /** - * @return expected data length of this packet or -1. + * @return expected data length of this packet or {@code UNKNOWN_EXPECTED_BYTES}. */ - default int expectedLength() { - return -1; + default int expectedLength(C connection) { + return UNKNOWN_EXPECTED_BYTES; } } diff --git a/rlib-network/src/main/java/javasabr/rlib/network/packet/impl/AbstractIdBasedReadableNetworkPacket.java b/rlib-network/src/main/java/javasabr/rlib/network/packet/impl/AbstractIdBasedReadableNetworkPacket.java index 0c57eeef..4e9a53d1 100644 --- a/rlib-network/src/main/java/javasabr/rlib/network/packet/impl/AbstractIdBasedReadableNetworkPacket.java +++ b/rlib-network/src/main/java/javasabr/rlib/network/packet/impl/AbstractIdBasedReadableNetworkPacket.java @@ -1,10 +1,11 @@ package javasabr.rlib.network.packet.impl; +import javasabr.rlib.network.Connection; import javasabr.rlib.network.packet.IdBasedReadableNetworkPacket; /** * @author JavaSaBr */ -public abstract class AbstractIdBasedReadableNetworkPacket> - extends AbstractReadableNetworkPacket implements IdBasedReadableNetworkPacket { +public abstract class AbstractIdBasedReadableNetworkPacket> + extends AbstractReadableNetworkPacket implements IdBasedReadableNetworkPacket { } diff --git a/rlib-network/src/main/java/javasabr/rlib/network/packet/impl/AbstractNetworkPacket.java b/rlib-network/src/main/java/javasabr/rlib/network/packet/impl/AbstractNetworkPacket.java index d42f0d3c..c0f4f97c 100644 --- a/rlib-network/src/main/java/javasabr/rlib/network/packet/impl/AbstractNetworkPacket.java +++ b/rlib-network/src/main/java/javasabr/rlib/network/packet/impl/AbstractNetworkPacket.java @@ -3,6 +3,7 @@ import static javasabr.rlib.network.util.NetworkUtils.hexDump; import java.nio.ByteBuffer; +import javasabr.rlib.network.Connection; import javasabr.rlib.network.packet.NetworkPacket; import lombok.CustomLog; @@ -12,12 +13,12 @@ * @author JavaSaBr */ @CustomLog -public abstract class AbstractNetworkPacket implements NetworkPacket { +public abstract class AbstractNetworkPacket> implements NetworkPacket { /** * Handles packet data exception. */ - protected void handleException(ByteBuffer buffer, Exception exception) { + protected void handleException(C connection, ByteBuffer buffer, Exception exception) { log.warning(exception); if (!log.warningEnabled()) { return; @@ -33,7 +34,8 @@ protected void handleException(ByteBuffer buffer, Exception exception) { hexDump = hexDump(buffer.array(), buffer.position(), buffer.limit()); } - log.warning(name(), buffer, hexDump, "[%s] -> buffer:[%s]\n[%s]"::formatted); + log.warning(connection.remoteAddress(), name(), buffer, hexDump, + "[%s] Hexdump for:[%s] -> buffer:[%s]\n[%s]"::formatted); } @Override diff --git a/rlib-network/src/main/java/javasabr/rlib/network/packet/impl/AbstractNetworkPacketReader.java b/rlib-network/src/main/java/javasabr/rlib/network/packet/impl/AbstractNetworkPacketReader.java index 567822a8..aae4fd2d 100644 --- a/rlib-network/src/main/java/javasabr/rlib/network/packet/impl/AbstractNetworkPacketReader.java +++ b/rlib-network/src/main/java/javasabr/rlib/network/packet/impl/AbstractNetworkPacketReader.java @@ -5,12 +5,18 @@ import java.nio.ByteBuffer; import java.nio.channels.AsynchronousCloseException; import java.nio.channels.AsynchronousSocketChannel; +import java.nio.channels.ClosedChannelException; import java.nio.channels.CompletionHandler; +import java.nio.channels.InterruptedByTimeoutException; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Consumer; import javasabr.rlib.common.util.BufferUtils; import javasabr.rlib.network.BufferAllocator; -import javasabr.rlib.network.Connection; +import javasabr.rlib.network.Network; +import javasabr.rlib.network.NetworkConfig; +import javasabr.rlib.network.UnsafeConnection; import javasabr.rlib.network.packet.NetworkPacketReader; import javasabr.rlib.network.packet.ReadableNetworkPacket; import lombok.AccessLevel; @@ -27,8 +33,9 @@ @CustomLog @Accessors(fluent = true, chain = false) @FieldDefaults(level = AccessLevel.PROTECTED) -public abstract class AbstractNetworkPacketReader> - implements NetworkPacketReader { +public abstract class AbstractNetworkPacketReader< + R extends ReadableNetworkPacket, + C extends UnsafeConnection> implements NetworkPacketReader { final CompletionHandler readChannelHandler = new CompletionHandler<>() { @@ -48,6 +55,7 @@ public void failed(Throwable exc, ByteBuffer readingBuffer) { }; final AtomicBoolean reading = new AtomicBoolean(false); + final AtomicInteger emptyReadsCounter = new AtomicInteger(0); final C connection; final AsynchronousSocketChannel socketChannel; @@ -93,12 +101,35 @@ protected String remoteAddress() { @Override public void startRead() { - if (!reading.compareAndSet(false, true)) { + if (connection.closed()) { + log.warning(connection.remoteAddress(), "[%s] Connection is already closed"::formatted); + return; + } else if (!reading.compareAndSet(false, true)) { + log.debug(connection.remoteAddress(), "[%s] Connection is already waiting for new data from channel"::formatted); return; } + startReadImpl(); + } + + protected void startReadImpl() { log.debug(remoteAddress(), "[%s] Start waiting for new data from channel..."::formatted); ByteBuffer buffer = bufferToReadFromChannel(); - socketChannel.read(buffer, buffer, readChannelHandler); + try { + socketChannel.read(buffer, buffer, readChannelHandler); + } catch (RuntimeException ex) { + log.error(ex); + if (reading.compareAndSet(true, false)) { + retryReadLater(); + } + } + } + + protected void retryReadLater() { + Network network = connection.network(); + NetworkConfig config = network.config(); + network + .scheduledExecutor() + .schedule(this::startRead, config.retryDelayInMs(), TimeUnit.MILLISECONDS); } /** @@ -287,7 +318,7 @@ else if (packetFullLength > tempBigBuffer.capacity()) { } protected void readAndHandlePacket(ByteBuffer bufferToRead, int remainingDataLength, R packetInstance) { - if (packetInstance.read(bufferToRead, remainingDataLength)) { + if (packetInstance.read(connection, bufferToRead, remainingDataLength)) { packetHandler.accept(packetInstance); } else { log.error(remoteAddress(), packetInstance, @@ -362,22 +393,43 @@ protected void freeTempBigBuffers() { */ protected void handleReceivedData(int receivedBytes, ByteBuffer readingBuffer) { updateActivityFunction.run(); - if (receivedBytes == -1) { + handleEmptyReadFromChannel(); + } else { + log.debug(remoteAddress(), receivedBytes, "[%s] Received [%s] bytes from channel"::formatted); + readingBuffer.flip(); + emptyReadsCounter.set(0); + try { + readPackets(readingBuffer); + } catch (Exception e) { + log.error(e); + } + startReadImpl(); + } + } + + protected void handleEmptyReadFromChannel() { + log.debug(remoteAddress(), "[%s] Received empty bytes from channel"::formatted); + + if (connection.closed()) { + reading.compareAndSet(true, false); + return; + } else if (!socketChannel.isOpen()) { connection.close(); return; } - log.debug(remoteAddress(), receivedBytes, "[%s] Received [%s] bytes from channel"::formatted); - readingBuffer.flip(); - try { - readPackets(readingBuffer); - } catch (Exception e) { - log.error(e); + NetworkConfig config = connection + .network() + .config(); + + if (emptyReadsCounter.incrementAndGet() > config.maxEmptyReadsBeforeClose()) { + connection.close(); + return; } if (reading.compareAndSet(true, false)) { - startRead(); + retryReadLater(); } } @@ -388,8 +440,16 @@ protected void handleReceivedData(int receivedBytes, ByteBuffer readingBuffer) { * @param readingBuffer the currently reading buffer. */ protected void handleFailedReceiving(Throwable exception, ByteBuffer readingBuffer) { + if (exception instanceof InterruptedByTimeoutException) { + if (reading.compareAndSet(true, false)) { + retryReadLater(); + } + return; + } if (exception instanceof AsynchronousCloseException) { log.info(remoteAddress(), "[%s] Connection was closed"::formatted); + } else if (exception instanceof ClosedChannelException) { + log.info(remoteAddress(), "[%s] Connection was closed"::formatted); } else { log.error(exception); connection.close(); @@ -434,11 +494,10 @@ protected abstract R createPacketFor( @Override public void close() { - bufferAllocator .putReadBuffer(readBuffer) .putPendingBuffer(pendingBuffer); - freeTempBigBuffers(); + reading.compareAndSet(true, false); } } diff --git a/rlib-network/src/main/java/javasabr/rlib/network/packet/impl/AbstractNetworkPacketWriter.java b/rlib-network/src/main/java/javasabr/rlib/network/packet/impl/AbstractNetworkPacketWriter.java index d2660ea7..214568ae 100644 --- a/rlib-network/src/main/java/javasabr/rlib/network/packet/impl/AbstractNetworkPacketWriter.java +++ b/rlib-network/src/main/java/javasabr/rlib/network/packet/impl/AbstractNetworkPacketWriter.java @@ -31,18 +31,19 @@ @RequiredArgsConstructor @Accessors(fluent = true, chain = false) @FieldDefaults(level = AccessLevel.PROTECTED) -public abstract class AbstractNetworkPacketWriter> - implements NetworkPacketWriter { +public abstract class AbstractNetworkPacketWriter< + W extends WritableNetworkPacket, + C extends Connection> implements NetworkPacketWriter { - final CompletionHandler writeHandler = new CompletionHandler<>() { + final CompletionHandler> writeHandler = new CompletionHandler<>() { @Override - public void completed(Integer result, @Nullable WritableNetworkPacket packet) { + public void completed(Integer result, @Nullable WritableNetworkPacket packet) { handleSuccessfulWritingData(result, packet); } @Override - public void failed(Throwable exc, @Nullable WritableNetworkPacket packet) { + public void failed(Throwable exc, @Nullable WritableNetworkPacket packet) { handleFailedWritingData(exc, packet); } }; @@ -64,18 +65,18 @@ public void failed(Throwable exc, @Nullable WritableNetworkPacket packet) { volatile ByteBuffer writingBuffer = EMPTY_BUFFER; final Runnable updateActivityFunction; - final Supplier<@Nullable WritableNetworkPacket> writablePacketProvider; - final ObjBoolConsumer sentPacketHandler; - final Consumer serializedToChannelPacketHandler; + final Supplier<@Nullable WritableNetworkPacket> writablePacketProvider; + final ObjBoolConsumer> sentPacketHandler; + final Consumer> serializedToChannelPacketHandler; public AbstractNetworkPacketWriter( C connection, AsynchronousSocketChannel socketChannel, BufferAllocator bufferAllocator, Runnable updateActivityFunction, - Supplier<@Nullable WritableNetworkPacket> packetProvider, - Consumer serializedToChannelPacketHandler, - ObjBoolConsumer sentPacketHandler) { + Supplier<@Nullable WritableNetworkPacket> packetProvider, + Consumer> serializedToChannelPacketHandler, + ObjBoolConsumer> sentPacketHandler) { this.connection = connection; this.socketChannel = socketChannel; this.bufferAllocator = bufferAllocator; @@ -125,7 +126,9 @@ protected boolean tryToSendNextPacketImpl() { return false; } - protected boolean writeBuffer(ByteBuffer resultBuffer, @Nullable WritableNetworkPacket nextPacket) { + protected boolean writeBuffer( + ByteBuffer resultBuffer, + @Nullable WritableNetworkPacket nextPacket) { if (resultBuffer.limit() == 0) { return false; } @@ -141,15 +144,15 @@ protected boolean writeBuffer(ByteBuffer resultBuffer, @Nullable WritableNetwork * @param packet the network packet. * @return the final byte buffer with data. */ - protected ByteBuffer serialize(WritableNetworkPacket packet) { + protected ByteBuffer serialize(WritableNetworkPacket packet) { - if (packet instanceof WritablePacketWrapper) { - packet = ((WritablePacketWrapper) packet).getPacket(); + if (packet instanceof WritablePacketWrapper wrapper) { + packet = wrapper.getPacket(); } W resultPacket = (W) packet; - int expectedLength = packet.expectedLength(); + int expectedLength = packet.expectedLength(connection); int totalSize = expectedLength == -1 ? -1 : totalSize(packet, expectedLength); // if the packet is too big to use a write buffer @@ -185,7 +188,7 @@ protected ByteBuffer serialize(WritableNetworkPacket packet) { * * @return the total size or -1. */ - protected abstract int totalSize(WritableNetworkPacket packet, int expectedLength); + protected abstract int totalSize(WritableNetworkPacket packet, int expectedLength); /** * Serializes the packet to the buffers. @@ -251,7 +254,7 @@ protected boolean doSerialize( int totalSize, ByteBuffer firstBuffer, ByteBuffer secondBuffer) { - return packet.write(firstBuffer); + return packet.write(connection, firstBuffer); } /** @@ -339,13 +342,16 @@ protected ByteBuffer writeHeader(ByteBuffer buffer, int value, int headerSize) { * @param wroteBytes the count of wrote bytes. * @param packet the sent packet. */ - protected void handleSuccessfulWritingData(Integer wroteBytes, @Nullable WritableNetworkPacket packet) { + protected void handleSuccessfulWritingData(Integer wroteBytes, @Nullable WritableNetworkPacket packet) { updateActivityFunction.run(); if (wroteBytes == -1) { if (packet != null) { sentPacketHandler.accept(packet, false); } + if (writing.compareAndSet(true, false)) { + clearTempBuffers(); + } connection.close(); return; } @@ -354,7 +360,14 @@ protected void handleSuccessfulWritingData(Integer wroteBytes, @Nullable Writabl if (writingBuffer.remaining() > 0) { log.debug(remoteAddress(), writingBuffer, "[%s] Buffer was not consumed fully, try to write else [%s] bytes to channel"::formatted); - socketChannel.write(writingBuffer, packet, writeHandler); + try { + socketChannel.write(writingBuffer, packet, writeHandler); + } catch (RuntimeException ex) { + log.error(ex); + if (writing.compareAndSet(true, false)) { + clearTempBuffers(); + } + } return; } else { log.debug(remoteAddress(), wroteBytes, "[%s] Finished writing [%s] bytes"::formatted); @@ -376,7 +389,7 @@ protected void handleSuccessfulWritingData(Integer wroteBytes, @Nullable Writabl * @param exception the exception. * @param packet the packet. */ - protected void handleFailedWritingData(Throwable exception, @Nullable WritableNetworkPacket packet) { + protected void handleFailedWritingData(Throwable exception, @Nullable WritableNetworkPacket packet) { log.error(new RuntimeException("Failed writing packet:" + packet, exception)); if (exception instanceof IOException) { connection.close(); @@ -397,6 +410,7 @@ public void close() { .putWriteBuffer(secondWriteBuffer); clearTempBuffers(); writingBuffer = EMPTY_BUFFER; + writing.compareAndSet(true, false); } protected void clearTempBuffers() { diff --git a/rlib-network/src/main/java/javasabr/rlib/network/packet/impl/AbstractReadableNetworkPacket.java b/rlib-network/src/main/java/javasabr/rlib/network/packet/impl/AbstractReadableNetworkPacket.java index 2b3b720c..c1fd1f84 100644 --- a/rlib-network/src/main/java/javasabr/rlib/network/packet/impl/AbstractReadableNetworkPacket.java +++ b/rlib-network/src/main/java/javasabr/rlib/network/packet/impl/AbstractReadableNetworkPacket.java @@ -2,7 +2,6 @@ import java.nio.BufferUnderflowException; import java.nio.ByteBuffer; -import javasabr.rlib.common.util.ClassUtils; import javasabr.rlib.network.Connection; import javasabr.rlib.network.packet.ReadableNetworkPacket; import lombok.AccessLevel; @@ -16,27 +15,27 @@ */ @CustomLog @NoArgsConstructor(access = AccessLevel.PROTECTED) -public abstract class AbstractReadableNetworkPacket - extends AbstractNetworkPacket implements ReadableNetworkPacket { +public abstract class AbstractReadableNetworkPacket> + extends AbstractNetworkPacket implements ReadableNetworkPacket { @Override - public boolean read(ByteBuffer buffer, int remainingDataLength) { + public boolean read(C connection, ByteBuffer buffer, int remainingDataLength) { int oldLimit = buffer.limit(); int oldPosition = buffer.position(); try { buffer.limit(oldPosition + remainingDataLength); - readImpl(buffer); + readImpl(connection, buffer); return true; } catch (Exception e) { buffer.position(oldPosition); - handleException(buffer, e); + handleException(connection, buffer, e); return false; } finally { buffer.limit(oldLimit); } } - protected void readImpl(ByteBuffer buffer) {} + protected void readImpl(C connection, ByteBuffer buffer) {} /** * Reads 1 byte from the buffer. @@ -45,6 +44,23 @@ protected int readByte(ByteBuffer buffer) { return buffer.get(); } + /** + * Reads 1 byte from the buffer. + */ + protected int readByteUnsigned(ByteBuffer buffer) { + return buffer.get() & 0xFF; + } + + /** + * Fills byte array with data from the buffer. + */ + protected byte[] readByteArray(ByteBuffer buffer) { + int size = readInt(buffer); + byte[] result = new byte[size]; + buffer.get(result); + return result; + } + /** * Fills byte array with data from the buffer. */ @@ -80,6 +96,14 @@ protected int readInt(ByteBuffer buffer) { return buffer.getInt(); } + /** + * Reads 4 bytes from buffer. + */ + protected long readIntUnsigned(ByteBuffer buffer) { + return buffer.getInt() & 0xFFFFFFFFL; + } + + /** * Reads 8 bytes from buffer. */ @@ -94,6 +118,13 @@ protected int readShort(ByteBuffer buffer) { return buffer.getShort(); } + /** + * Reads 2 bytes from buffer. + */ + protected int readShortUnsigned(ByteBuffer buffer) { + return buffer.getShort() & 0xFFFF; + } + /** * Read a string from buffer. */ diff --git a/rlib-network/src/main/java/javasabr/rlib/network/packet/impl/AbstractReusableWritableNetworkPacket.java b/rlib-network/src/main/java/javasabr/rlib/network/packet/impl/AbstractReusableWritableNetworkPacket.java index 30c057a2..84f40ee5 100644 --- a/rlib-network/src/main/java/javasabr/rlib/network/packet/impl/AbstractReusableWritableNetworkPacket.java +++ b/rlib-network/src/main/java/javasabr/rlib/network/packet/impl/AbstractReusableWritableNetworkPacket.java @@ -7,6 +7,7 @@ import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; import javasabr.rlib.common.util.ClassUtils; +import javasabr.rlib.network.Connection; import javasabr.rlib.network.packet.ReusableWritablePacket; import javasabr.rlib.reusable.pool.Pool; import javasabr.rlib.reusable.pool.PoolFactory; @@ -23,8 +24,8 @@ */ @CustomLog @FieldDefaults(level = AccessLevel.PROTECTED) -public abstract class AbstractReusableWritableNetworkPacket extends AbstractWritableNetworkPacket - implements ReusableWritablePacket { +public abstract class AbstractReusableWritableNetworkPacket> + extends AbstractWritableNetworkPacket implements ReusableWritablePacket { protected static final ThreadLocal, Pool>> LOCAL_POOLS = ThreadLocal.withInitial(HashMap::new); @@ -35,7 +36,7 @@ public abstract class AbstractReusableWritableNetworkPacket extends AbstractWrit * The pool to store this packet after using. */ @Nullable - volatile Pool pool; + volatile Pool> pool; volatile int barrier; int barrierSink; @@ -45,7 +46,7 @@ public AbstractReusableWritableNetworkPacket() { } @Override - public boolean write(ByteBuffer buffer) { + public boolean write(C connection, ByteBuffer buffer) { if (counter.get() < 1) { log.warning(this, arg -> @@ -55,7 +56,7 @@ public boolean write(ByteBuffer buffer) { notifyStartedWriting(); try { - super.write(buffer); + super.write(connection, buffer); } finally { notifyFinishedWriting(); } @@ -123,11 +124,10 @@ public void forceComplete() { * * @return thread local pool. */ - protected Pool threadLocalPool() { - Class packetClass = ClassUtils.unsafeNNCast(getClass()); - return LOCAL_POOLS - .get() - .computeIfAbsent(packetClass, PoolFactory::newLockBasePool); + protected Pool> threadLocalPool() { + Pool resultPool = LOCAL_POOLS.get() + .computeIfAbsent(ClassUtils.unsafeNNCast(getClass()), PoolFactory::newLockBasePool); + return ClassUtils.unsafeCast(resultPool); } /** @@ -135,9 +135,9 @@ protected Pool threadLocalPool() { * * @return the pool to store used packet. */ - protected Pool getPool() { + protected Pool> getPool() { - Pool local = this.pool; + Pool> local = this.pool; if (local != null) { return local; } @@ -166,17 +166,17 @@ protected void completeImpl() { * @param the result packet's type. * @return the new instance. */ - public T newInstance() { + public > T newInstance() { - Pool pool = getPool(); - ReusableWritablePacket result = pool.take(getClass(), ClassUtils::newInstance); + Pool> pool = getPool(); + ReusableWritablePacket result = pool.take(getClass(), ClassUtils::newInstance); result.setPool(pool); return notNull(ClassUtils.unsafeCast(result)); } @Override - public final void setPool(Pool pool) { + public final void setPool(Pool> pool) { this.pool = pool; } diff --git a/rlib-network/src/main/java/javasabr/rlib/network/packet/impl/AbstractSslNetworkPacketReader.java b/rlib-network/src/main/java/javasabr/rlib/network/packet/impl/AbstractSslNetworkPacketReader.java index fdaeb2db..89b3ec68 100644 --- a/rlib-network/src/main/java/javasabr/rlib/network/packet/impl/AbstractSslNetworkPacketReader.java +++ b/rlib-network/src/main/java/javasabr/rlib/network/packet/impl/AbstractSslNetworkPacketReader.java @@ -7,7 +7,7 @@ import java.util.function.Consumer; import javasabr.rlib.common.util.BufferUtils; import javasabr.rlib.network.BufferAllocator; -import javasabr.rlib.network.Connection; +import javasabr.rlib.network.UnsafeConnection; import javasabr.rlib.network.packet.ReadableNetworkPacket; import javasabr.rlib.network.packet.WritableNetworkPacket; import javasabr.rlib.network.util.NetworkUtils; @@ -28,8 +28,9 @@ @CustomLog @Accessors(fluent = true, chain = false) @FieldDefaults(level = AccessLevel.PROTECTED) -public abstract class AbstractSslNetworkPacketReader> - extends AbstractNetworkPacketReader { +public abstract class AbstractSslNetworkPacketReader< + R extends ReadableNetworkPacket, + C extends UnsafeConnection> extends AbstractNetworkPacketReader { private static final ByteBuffer[] EMPTY_BUFFERS = { NetworkUtils.EMPTY_BUFFER @@ -38,7 +39,7 @@ public abstract class AbstractSslNetworkPacketReader packetWriter; + final Consumer> packetWriter; @Getter(value = AccessLevel.PROTECTED) volatile ByteBuffer sslNetworkBuffer; @@ -54,7 +55,7 @@ protected AbstractSslNetworkPacketReader( Runnable updateActivityFunction, Consumer readPacketHandler, SSLEngine sslEngine, - Consumer packetWriter, + Consumer> packetWriter, int maxPacketsByRead) { super(connection, channel, bufferAllocator, updateActivityFunction, readPacketHandler, maxPacketsByRead); this.sslEngine = sslEngine; @@ -157,7 +158,7 @@ protected int doHandshake(ByteBuffer networkBuffer, int receivedBytes) { } case NEED_WRAP: { log.debug(remoteAddress, "[%s] Send command to wrap data"::formatted); - packetWriter.accept(SslWrapRequestPacket.getInstance()); + packetWriter.accept(SslWrapRequestNetworkPacket.getInstance()); NetworkUtils.cleanNetworkBuffer(networkBuffer); return SKIP_READ_PACKETS; } @@ -179,7 +180,7 @@ protected int doHandshake(ByteBuffer networkBuffer, int receivedBytes) { if (!networkBuffer.hasRemaining()) { // if buffer is empty and status is FINISHED then we can notify writer if (handshakeStatus == HandshakeStatus.FINISHED) { - packetWriter.accept(SslWrapRequestPacket.getInstance()); + packetWriter.accept(SslWrapRequestNetworkPacket.getInstance()); } NetworkUtils.cleanNetworkBuffer(networkBuffer); return SKIP_READ_PACKETS; diff --git a/rlib-network/src/main/java/javasabr/rlib/network/packet/impl/AbstractSslNetworkPacketWriter.java b/rlib-network/src/main/java/javasabr/rlib/network/packet/impl/AbstractSslNetworkPacketWriter.java index bbd649e3..b759aea6 100644 --- a/rlib-network/src/main/java/javasabr/rlib/network/packet/impl/AbstractSslNetworkPacketWriter.java +++ b/rlib-network/src/main/java/javasabr/rlib/network/packet/impl/AbstractSslNetworkPacketWriter.java @@ -27,15 +27,16 @@ @CustomLog @Accessors(fluent = true, chain = false) @FieldDefaults(level = AccessLevel.PROTECTED) -public abstract class AbstractSslNetworkPacketWriter> - extends AbstractNetworkPacketWriter { +public abstract class AbstractSslNetworkPacketWriter< + W extends WritableNetworkPacket, + C extends Connection> extends AbstractNetworkPacketWriter { private static final ByteBuffer[] EMPTY_BUFFERS = { NetworkUtils.EMPTY_BUFFER }; final SSLEngine sslEngine; - final Consumer queueAtFirst; + final Consumer> queueAtFirst; @Getter(AccessLevel.PROTECTED) @Nullable @@ -49,11 +50,11 @@ public AbstractSslNetworkPacketWriter( AsynchronousSocketChannel channel, BufferAllocator bufferAllocator, Runnable updateActivityFunction, - Supplier packetProvider, - Consumer serializedToChannelPacketHandler, - ObjBoolConsumer sentPacketHandler, + Supplier> packetProvider, + Consumer> serializedToChannelPacketHandler, + ObjBoolConsumer> sentPacketHandler, SSLEngine sslEngine, - Consumer queueAtFirst) { + Consumer> queueAtFirst) { super( connection, channel, @@ -94,8 +95,8 @@ protected boolean tryToSendNextPacketImpl() { } @Override - protected ByteBuffer serialize(WritableNetworkPacket packet) { - if (packet instanceof SslWrapRequestPacket) { + protected ByteBuffer serialize(WritableNetworkPacket packet) { + if (packet instanceof SslWrapRequestNetworkPacket) { return EMPTY_BUFFER; } diff --git a/rlib-network/src/main/java/javasabr/rlib/network/packet/impl/AbstractWritableNetworkPacket.java b/rlib-network/src/main/java/javasabr/rlib/network/packet/impl/AbstractWritableNetworkPacket.java index 7bf754f6..f1b0452b 100644 --- a/rlib-network/src/main/java/javasabr/rlib/network/packet/impl/AbstractWritableNetworkPacket.java +++ b/rlib-network/src/main/java/javasabr/rlib/network/packet/impl/AbstractWritableNetworkPacket.java @@ -2,6 +2,7 @@ import java.nio.BufferOverflowException; import java.nio.ByteBuffer; +import javasabr.rlib.network.Connection; import javasabr.rlib.network.packet.WritableNetworkPacket; import lombok.CustomLog; @@ -11,16 +12,16 @@ * @author JavaSaBr */ @CustomLog -public abstract class AbstractWritableNetworkPacket extends AbstractNetworkPacket - implements WritableNetworkPacket { +public abstract class AbstractWritableNetworkPacket> + extends AbstractNetworkPacket implements WritableNetworkPacket { @Override - public boolean write(ByteBuffer buffer) { + public boolean write(C connection, ByteBuffer buffer) { try { - writeImpl(buffer); + writeImpl(connection, buffer); return true; } catch (Exception e) { - handleException(buffer, e); + handleException(connection, buffer, e); return false; } } @@ -28,7 +29,7 @@ public boolean write(ByteBuffer buffer) { /** * The process of writing this packet to the buffer. */ - protected void writeImpl(ByteBuffer buffer) {} + protected void writeImpl(C connection, ByteBuffer buffer) {} /** * Write 1 byte to the buffer. @@ -58,6 +59,13 @@ protected void writeFloat(ByteBuffer buffer, float value) { buffer.putFloat(value); } + /** + * Write 8 bytes to the buffer. + */ + protected void writeDouble(ByteBuffer buffer, double value) { + buffer.putDouble(value); + } + /** * Write 4 bytes to the buffer. */ @@ -79,6 +87,29 @@ protected void writeShort(ByteBuffer buffer, int value) { buffer.putShort((short) value); } + /** + * Writes bytes to the buffer. + */ + protected void writeBytes(ByteBuffer buffer, byte[] bytes) { + buffer.put(bytes); + } + + /** + * Writes bytes to the buffer. + */ + protected void writeBytes(ByteBuffer buffer, byte[] bytes, int offset, int length) { + buffer.put(bytes, offset, length); + } + + /** + * Writes bytes to the buffer. + */ + protected void writeByteArray(ByteBuffer buffer, byte[] bytes) { + buffer.putInt(bytes.length); + buffer.put(bytes); + } + + /** * Writes the string to the buffer. */ diff --git a/rlib-network/src/main/java/javasabr/rlib/network/packet/impl/DefaultNetworkPacketReader.java b/rlib-network/src/main/java/javasabr/rlib/network/packet/impl/DefaultNetworkPacketReader.java index 5eb079e5..4fd8b975 100644 --- a/rlib-network/src/main/java/javasabr/rlib/network/packet/impl/DefaultNetworkPacketReader.java +++ b/rlib-network/src/main/java/javasabr/rlib/network/packet/impl/DefaultNetworkPacketReader.java @@ -5,7 +5,7 @@ import java.util.function.Consumer; import java.util.function.IntFunction; import javasabr.rlib.network.BufferAllocator; -import javasabr.rlib.network.Connection; +import javasabr.rlib.network.UnsafeConnection; import javasabr.rlib.network.packet.ReadableNetworkPacket; import lombok.AccessLevel; import lombok.experimental.FieldDefaults; @@ -15,8 +15,9 @@ * @author JavaSaBR */ @FieldDefaults(level = AccessLevel.PROTECTED) -public class DefaultNetworkPacketReader> - extends AbstractNetworkPacketReader { +public class DefaultNetworkPacketReader< + R extends ReadableNetworkPacket, + C extends UnsafeConnection> extends AbstractNetworkPacketReader { final IntFunction readablePacketFactory; final int packetLengthHeaderSize; diff --git a/rlib-network/src/main/java/javasabr/rlib/network/packet/impl/DefaultNetworkPacketWriter.java b/rlib-network/src/main/java/javasabr/rlib/network/packet/impl/DefaultNetworkPacketWriter.java index fe6a1e99..9234df50 100644 --- a/rlib-network/src/main/java/javasabr/rlib/network/packet/impl/DefaultNetworkPacketWriter.java +++ b/rlib-network/src/main/java/javasabr/rlib/network/packet/impl/DefaultNetworkPacketWriter.java @@ -16,8 +16,9 @@ * @author JavaSaBr */ @FieldDefaults(level = AccessLevel.PROTECTED) -public class DefaultNetworkPacketWriter> - extends AbstractNetworkPacketWriter { +public class DefaultNetworkPacketWriter< + W extends WritableNetworkPacket, + C extends Connection> extends AbstractNetworkPacketWriter { final int packetLengthHeaderSize; @@ -26,9 +27,9 @@ public DefaultNetworkPacketWriter( AsynchronousSocketChannel channel, BufferAllocator bufferAllocator, Runnable updateActivityFunction, - Supplier<@Nullable WritableNetworkPacket> packetProvider, - Consumer serializedToChannelPacketHandler, - ObjBoolConsumer sentPacketHandler, + Supplier<@Nullable WritableNetworkPacket> packetProvider, + Consumer> serializedToChannelPacketHandler, + ObjBoolConsumer> sentPacketHandler, int packetLengthHeaderSize) { super( connection, @@ -42,7 +43,7 @@ public DefaultNetworkPacketWriter( } @Override - protected int totalSize(WritableNetworkPacket packet, int expectedLength) { + protected int totalSize(WritableNetworkPacket packet, int expectedLength) { return expectedLength + packetLengthHeaderSize; } diff --git a/rlib-network/src/main/java/javasabr/rlib/network/packet/impl/DefaultReadableNetworkPacket.java b/rlib-network/src/main/java/javasabr/rlib/network/packet/impl/DefaultReadableNetworkPacket.java index 71115e89..ba88583e 100644 --- a/rlib-network/src/main/java/javasabr/rlib/network/packet/impl/DefaultReadableNetworkPacket.java +++ b/rlib-network/src/main/java/javasabr/rlib/network/packet/impl/DefaultReadableNetworkPacket.java @@ -1,9 +1,9 @@ package javasabr.rlib.network.packet.impl; +import javasabr.rlib.network.Connection; + /** * @author JavaSaBr */ -public class DefaultReadableNetworkPacket extends - AbstractIdBasedReadableNetworkPacket { - +public class DefaultReadableNetworkPacket> extends AbstractIdBasedReadableNetworkPacket { } diff --git a/rlib-network/src/main/java/javasabr/rlib/network/packet/impl/DefaultSslNetworkPacketReader.java b/rlib-network/src/main/java/javasabr/rlib/network/packet/impl/DefaultSslNetworkPacketReader.java index 0f46f607..f7c15fd7 100644 --- a/rlib-network/src/main/java/javasabr/rlib/network/packet/impl/DefaultSslNetworkPacketReader.java +++ b/rlib-network/src/main/java/javasabr/rlib/network/packet/impl/DefaultSslNetworkPacketReader.java @@ -5,7 +5,7 @@ import java.util.function.Consumer; import java.util.function.IntFunction; import javasabr.rlib.network.BufferAllocator; -import javasabr.rlib.network.Connection; +import javasabr.rlib.network.UnsafeConnection; import javasabr.rlib.network.packet.ReadableNetworkPacket; import javasabr.rlib.network.packet.WritableNetworkPacket; import javax.net.ssl.SSLEngine; @@ -17,8 +17,9 @@ * @author JavaSaBR */ @FieldDefaults(level = AccessLevel.PROTECTED) -public class DefaultSslNetworkPacketReader> extends - AbstractSslNetworkPacketReader { +public class DefaultSslNetworkPacketReader< + R extends ReadableNetworkPacket, + C extends UnsafeConnection> extends AbstractSslNetworkPacketReader { final IntFunction packetResolver; final int packetLengthHeaderSize; @@ -31,7 +32,7 @@ public DefaultSslNetworkPacketReader( Consumer packetHandler, IntFunction packetResolver, SSLEngine sslEngine, - Consumer packetWriter, + Consumer> packetWriter, int packetLengthHeaderSize, int maxPacketsByRead) { super( diff --git a/rlib-network/src/main/java/javasabr/rlib/network/packet/impl/DefaultSslNetworkPacketWriter.java b/rlib-network/src/main/java/javasabr/rlib/network/packet/impl/DefaultSslNetworkPacketWriter.java index 52438cbe..44f93818 100644 --- a/rlib-network/src/main/java/javasabr/rlib/network/packet/impl/DefaultSslNetworkPacketWriter.java +++ b/rlib-network/src/main/java/javasabr/rlib/network/packet/impl/DefaultSslNetworkPacketWriter.java @@ -17,8 +17,9 @@ * @author JavaSaBr */ @FieldDefaults(level = AccessLevel.PROTECTED) -public class DefaultSslNetworkPacketWriter> extends - AbstractSslNetworkPacketWriter { +public class DefaultSslNetworkPacketWriter< + W extends WritableNetworkPacket, + C extends Connection> extends AbstractSslNetworkPacketWriter { final int packetLengthHeaderSize; @@ -27,11 +28,11 @@ public DefaultSslNetworkPacketWriter( AsynchronousSocketChannel channel, BufferAllocator bufferAllocator, Runnable updateActivityFunction, - Supplier<@Nullable WritableNetworkPacket> nextWritePacketSupplier, - Consumer serializedToChannelPacketHandler, - ObjBoolConsumer sentPacketHandler, + Supplier<@Nullable WritableNetworkPacket> nextWritePacketSupplier, + Consumer> serializedToChannelPacketHandler, + ObjBoolConsumer> sentPacketHandler, SSLEngine sslEngine, - Consumer queueAtFirst, + Consumer> queueAtFirst, int packetLengthHeaderSize) { super( connection, @@ -47,7 +48,7 @@ public DefaultSslNetworkPacketWriter( } @Override - protected int totalSize(WritableNetworkPacket packet, int expectedLength) { + protected int totalSize(WritableNetworkPacket packet, int expectedLength) { return expectedLength + packetLengthHeaderSize; } diff --git a/rlib-network/src/main/java/javasabr/rlib/network/packet/impl/DefaultWritableNetworkPacket.java b/rlib-network/src/main/java/javasabr/rlib/network/packet/impl/DefaultWritableNetworkPacket.java index c134ac95..fc2b2071 100644 --- a/rlib-network/src/main/java/javasabr/rlib/network/packet/impl/DefaultWritableNetworkPacket.java +++ b/rlib-network/src/main/java/javasabr/rlib/network/packet/impl/DefaultWritableNetworkPacket.java @@ -1,9 +1,10 @@ package javasabr.rlib.network.packet.impl; +import javasabr.rlib.network.Connection; import javasabr.rlib.network.packet.IdBasedWritableNetworkPacket; /** * @author JavaSaBr */ -public class DefaultWritableNetworkPacket extends AbstractWritableNetworkPacket implements - IdBasedWritableNetworkPacket {} +public class DefaultWritableNetworkPacket> extends AbstractWritableNetworkPacket + implements IdBasedWritableNetworkPacket {} diff --git a/rlib-network/src/main/java/javasabr/rlib/network/packet/impl/IdBasedPacketReader.java b/rlib-network/src/main/java/javasabr/rlib/network/packet/impl/IdBasedNetworkPacketReader.java similarity index 80% rename from rlib-network/src/main/java/javasabr/rlib/network/packet/impl/IdBasedPacketReader.java rename to rlib-network/src/main/java/javasabr/rlib/network/packet/impl/IdBasedNetworkPacketReader.java index 061b1f04..e1bee478 100644 --- a/rlib-network/src/main/java/javasabr/rlib/network/packet/impl/IdBasedPacketReader.java +++ b/rlib-network/src/main/java/javasabr/rlib/network/packet/impl/IdBasedNetworkPacketReader.java @@ -4,7 +4,7 @@ import java.nio.channels.AsynchronousSocketChannel; import java.util.function.Consumer; import javasabr.rlib.network.BufferAllocator; -import javasabr.rlib.network.Connection; +import javasabr.rlib.network.UnsafeConnection; import javasabr.rlib.network.packet.IdBasedReadableNetworkPacket; import javasabr.rlib.network.packet.registry.ReadableNetworkPacketRegistry; import lombok.AccessLevel; @@ -15,14 +15,15 @@ * @author JavaSaBr */ @FieldDefaults(level = AccessLevel.PROTECTED) -public class IdBasedPacketReader, C extends Connection> extends - AbstractNetworkPacketReader { +public class IdBasedNetworkPacketReader< + R extends IdBasedReadableNetworkPacket, + C extends UnsafeConnection> extends AbstractNetworkPacketReader { - final ReadableNetworkPacketRegistry packetRegistry; + final ReadableNetworkPacketRegistry packetRegistry; final int packetLengthHeaderSize; final int packetIdHeaderSize; - public IdBasedPacketReader( + public IdBasedNetworkPacketReader( C connection, AsynchronousSocketChannel channel, BufferAllocator bufferAllocator, @@ -31,7 +32,7 @@ public IdBasedPacketReader( int packetLengthHeaderSize, int maxPacketsByRead, int packetIdHeaderSize, - ReadableNetworkPacketRegistry packetRegistry) { + ReadableNetworkPacketRegistry packetRegistry) { super(connection, channel, bufferAllocator, updateActivityFunction, packetHandler, maxPacketsByRead); this.packetLengthHeaderSize = packetLengthHeaderSize; this.packetIdHeaderSize = packetIdHeaderSize; @@ -55,7 +56,7 @@ protected R createPacketFor( int startPacketPosition, int packetFullLength, int packetDataLength) { - return packetRegistry + return (R) packetRegistry .resolvePrototypeById(readHeader(buffer, packetIdHeaderSize)) .newInstance(); } diff --git a/rlib-network/src/main/java/javasabr/rlib/network/packet/impl/IdBasedNetworkPacketWriter.java b/rlib-network/src/main/java/javasabr/rlib/network/packet/impl/IdBasedNetworkPacketWriter.java index 7ee8f037..a8e9d767 100644 --- a/rlib-network/src/main/java/javasabr/rlib/network/packet/impl/IdBasedNetworkPacketWriter.java +++ b/rlib-network/src/main/java/javasabr/rlib/network/packet/impl/IdBasedNetworkPacketWriter.java @@ -4,9 +4,6 @@ import java.nio.channels.AsynchronousSocketChannel; import java.util.function.Consumer; import java.util.function.Supplier; -import javasabr.rlib.common.function.NotNullBiConsumer; -import javasabr.rlib.common.function.NotNullConsumer; -import javasabr.rlib.common.function.NullableSupplier; import javasabr.rlib.functions.ObjBoolConsumer; import javasabr.rlib.network.BufferAllocator; import javasabr.rlib.network.Connection; @@ -20,8 +17,9 @@ * @author JavaSaBr */ @FieldDefaults(level = AccessLevel.PROTECTED) -public class IdBasedNetworkPacketWriter> - extends DefaultNetworkPacketWriter { +public class IdBasedNetworkPacketWriter< + W extends IdBasedWritableNetworkPacket, + C extends Connection> extends DefaultNetworkPacketWriter { final int packetIdHeaderSize; @@ -30,9 +28,9 @@ public IdBasedNetworkPacketWriter( AsynchronousSocketChannel channel, BufferAllocator bufferAllocator, Runnable updateActivityFunction, - Supplier<@Nullable WritableNetworkPacket> packetProvider, - Consumer serializedToChannelPacketHandler, - ObjBoolConsumer sentPacketHandler, + Supplier<@Nullable WritableNetworkPacket> packetProvider, + Consumer> serializedToChannelPacketHandler, + ObjBoolConsumer> sentPacketHandler, int packetLengthHeaderSize, int packetIdHeaderSize) { super( diff --git a/rlib-network/src/main/java/javasabr/rlib/network/packet/impl/SslWrapRequestNetworkPacket.java b/rlib-network/src/main/java/javasabr/rlib/network/packet/impl/SslWrapRequestNetworkPacket.java new file mode 100644 index 00000000..8781301d --- /dev/null +++ b/rlib-network/src/main/java/javasabr/rlib/network/packet/impl/SslWrapRequestNetworkPacket.java @@ -0,0 +1,20 @@ +package javasabr.rlib.network.packet.impl; + +import java.nio.ByteBuffer; +import javasabr.rlib.network.Connection; +import javasabr.rlib.network.packet.MarkerNetworkPacket; + +public class SslWrapRequestNetworkPacket> + extends AbstractWritableNetworkPacket implements MarkerNetworkPacket { + + private static final SslWrapRequestNetworkPacket INSTANCE = new SslWrapRequestNetworkPacket<>(); + + public static > SslWrapRequestNetworkPacket getInstance() { + return (SslWrapRequestNetworkPacket) INSTANCE; + } + + @Override + public boolean write(C connection, ByteBuffer buffer) { + return true; + } +} diff --git a/rlib-network/src/main/java/javasabr/rlib/network/packet/impl/SslWrapRequestPacket.java b/rlib-network/src/main/java/javasabr/rlib/network/packet/impl/SslWrapRequestPacket.java deleted file mode 100644 index 4d12d963..00000000 --- a/rlib-network/src/main/java/javasabr/rlib/network/packet/impl/SslWrapRequestPacket.java +++ /dev/null @@ -1,19 +0,0 @@ -package javasabr.rlib.network.packet.impl; - -import java.nio.ByteBuffer; -import javasabr.rlib.network.packet.MarkerNetworkPacket; - -public class SslWrapRequestPacket extends AbstractWritableNetworkPacket - implements MarkerNetworkPacket { - - private static final SslWrapRequestPacket INSTANCE = new SslWrapRequestPacket(); - - public static SslWrapRequestPacket getInstance() { - return INSTANCE; - } - - @Override - public boolean write(ByteBuffer buffer) { - return true; - } -} diff --git a/rlib-network/src/main/java/javasabr/rlib/network/packet/impl/StringReadablePacket.java b/rlib-network/src/main/java/javasabr/rlib/network/packet/impl/StringReadableNetworkPacket.java similarity index 78% rename from rlib-network/src/main/java/javasabr/rlib/network/packet/impl/StringReadablePacket.java rename to rlib-network/src/main/java/javasabr/rlib/network/packet/impl/StringReadableNetworkPacket.java index a896ef20..4cbbbd6e 100644 --- a/rlib-network/src/main/java/javasabr/rlib/network/packet/impl/StringReadablePacket.java +++ b/rlib-network/src/main/java/javasabr/rlib/network/packet/impl/StringReadableNetworkPacket.java @@ -1,6 +1,7 @@ package javasabr.rlib.network.packet.impl; import java.nio.ByteBuffer; +import javasabr.rlib.network.Connection; import lombok.AccessLevel; import lombok.Getter; import lombok.experimental.Accessors; @@ -13,7 +14,7 @@ @Getter @Accessors(fluent = true, chain = false) @FieldDefaults(level = AccessLevel.PROTECTED) -public class StringReadablePacket extends AbstractReadableNetworkPacket { +public class StringReadableNetworkPacket> extends AbstractReadableNetworkPacket { public static final int MAX_LENGTH = 100_000; @@ -21,7 +22,7 @@ public class StringReadablePacket extends AbstractReadableNetworkPacket { volatile String data; @Override - protected void readImpl(ByteBuffer buffer) { + protected void readImpl(C connection, ByteBuffer buffer) { this.data = readString(buffer, MAX_LENGTH); } diff --git a/rlib-network/src/main/java/javasabr/rlib/network/packet/impl/StringWritableNetworkPacket.java b/rlib-network/src/main/java/javasabr/rlib/network/packet/impl/StringWritableNetworkPacket.java index 0c7a4542..14331c63 100644 --- a/rlib-network/src/main/java/javasabr/rlib/network/packet/impl/StringWritableNetworkPacket.java +++ b/rlib-network/src/main/java/javasabr/rlib/network/packet/impl/StringWritableNetworkPacket.java @@ -1,6 +1,7 @@ package javasabr.rlib.network.packet.impl; import java.nio.ByteBuffer; +import javasabr.rlib.network.Connection; import lombok.AccessLevel; import lombok.Getter; import lombok.RequiredArgsConstructor; @@ -14,18 +15,18 @@ @RequiredArgsConstructor @Accessors(fluent = true, chain = false) @FieldDefaults(level = AccessLevel.PRIVATE, makeFinal = true) -public class StringWritableNetworkPacket extends AbstractWritableNetworkPacket { +public class StringWritableNetworkPacket> extends AbstractWritableNetworkPacket { String data; @Override - protected void writeImpl(ByteBuffer buffer) { - super.writeImpl(buffer); + protected void writeImpl(C connection, ByteBuffer buffer) { + super.writeImpl(connection, buffer); writeString(buffer, data); } @Override - public int expectedLength() { + public int expectedLength(C connection) { return 4 + data.length() * 2; } diff --git a/rlib-network/src/main/java/javasabr/rlib/network/packet/impl/WritablePacketWrapper.java b/rlib-network/src/main/java/javasabr/rlib/network/packet/impl/WritablePacketWrapper.java index 668e1ba1..5440eb52 100644 --- a/rlib-network/src/main/java/javasabr/rlib/network/packet/impl/WritablePacketWrapper.java +++ b/rlib-network/src/main/java/javasabr/rlib/network/packet/impl/WritablePacketWrapper.java @@ -1,32 +1,30 @@ package javasabr.rlib.network.packet.impl; import java.nio.ByteBuffer; +import javasabr.rlib.network.Connection; import javasabr.rlib.network.packet.WritableNetworkPacket; import lombok.Getter; import lombok.RequiredArgsConstructor; /** * The writable packet wrapper with additional attachment. - * - * @param the attachment type. - * @param the Writable packet. */ @Getter @RequiredArgsConstructor -public class WritablePacketWrapper - implements WritableNetworkPacket { +public class WritablePacketWrapper> + implements WritableNetworkPacket { private final A attachment; - private final W packet; + private final WritableNetworkPacket packet; @Override - public boolean write(ByteBuffer buffer) { - return packet.write(buffer); + public boolean write(C connection, ByteBuffer buffer) { + return packet.write(connection, buffer); } @Override - public int expectedLength() { - return packet.expectedLength(); + public int expectedLength(C connection) { + return packet.expectedLength(connection); } @Override diff --git a/rlib-network/src/main/java/javasabr/rlib/network/packet/registry/ReadableNetworkPacketRegistry.java b/rlib-network/src/main/java/javasabr/rlib/network/packet/registry/ReadableNetworkPacketRegistry.java index 3a0cf08a..ddffaf70 100644 --- a/rlib-network/src/main/java/javasabr/rlib/network/packet/registry/ReadableNetworkPacketRegistry.java +++ b/rlib-network/src/main/java/javasabr/rlib/network/packet/registry/ReadableNetworkPacketRegistry.java @@ -4,6 +4,8 @@ import javasabr.rlib.classpath.ClassPathScannerFactory; import javasabr.rlib.collections.array.Array; import javasabr.rlib.collections.array.ArrayCollectors; +import javasabr.rlib.common.util.ClassUtils; +import javasabr.rlib.network.Connection; import javasabr.rlib.network.annotation.NetworkPacketDescription; import javasabr.rlib.network.packet.IdBasedReadableNetworkPacket; import javasabr.rlib.network.packet.registry.impl.IdBasedReadableNetworkPacketRegistry; @@ -13,57 +15,65 @@ * * @author JavaSaBr */ -public interface ReadableNetworkPacketRegistry> { +public interface ReadableNetworkPacketRegistry, C extends Connection> { /** * Creates a new empty readable packet registry. */ - static ReadableNetworkPacketRegistry empty() { + static ReadableNetworkPacketRegistry empty() { return new IdBasedReadableNetworkPacketRegistry<>(IdBasedReadableNetworkPacket.class); } /** * Create a new empty readable packet registry. */ - static > ReadableNetworkPacketRegistry empty(Class type) { + static < + R extends IdBasedReadableNetworkPacket, + C extends Connection> ReadableNetworkPacketRegistry empty(Class type) { return new IdBasedReadableNetworkPacketRegistry<>(type); } /** * Creates a new class path scanning based readable packet registry. */ - static ReadableNetworkPacketRegistry classPathBased() { - + static < + R extends IdBasedReadableNetworkPacket, + C extends Connection> ReadableNetworkPacketRegistry classPathBased(Class baseType) { var scanner = ClassPathScannerFactory.newDefaultScanner(); scanner.useSystemClassPath(true); scanner.scan(); - - return of(scanner); + return scannerBased(baseType, scanner); } /** * Creates a new class path scanning based readable packet registry by scanning the main class. */ - static ReadableNetworkPacketRegistry classPathBased(Class mainClass) { - + static < + R extends IdBasedReadableNetworkPacket, + C extends Connection> ReadableNetworkPacketRegistry classPathBased( + Class baseType, + Class mainClass) { var scanner = ClassPathScannerFactory.newManifestScanner(mainClass); scanner.useSystemClassPath(false); scanner.scan(); - - return of(scanner); + return scannerBased(baseType, scanner); } /** * Creates a new class path scanning based readable packet registry. */ - static ReadableNetworkPacketRegistry of(ClassPathScanner scanner) { - - var result = scanner + static < + R extends IdBasedReadableNetworkPacket, + C extends Connection> ReadableNetworkPacketRegistry scannerBased( + Class baseType, + ClassPathScanner scanner) { + Array> result = scanner .findImplementations(IdBasedReadableNetworkPacket.class) .stream() .filter(type -> type.getAnnotation(NetworkPacketDescription.class) != null) - .collect(ArrayCollectors.>toArray(Class.class)); - return new IdBasedReadableNetworkPacketRegistry<>(IdBasedReadableNetworkPacket.class) + .map(ClassUtils::>unsafeNNCast) + .collect(ArrayCollectors.toArray(Class.class)); + return new IdBasedReadableNetworkPacketRegistry<>(baseType) .register(result); } @@ -71,19 +81,34 @@ static ReadableNetworkPacketRegistry of(ClassPathScanner scanner) { * Creates a new readable packet registry. */ @SafeVarargs - static > ReadableNetworkPacketRegistry of( - Class type, - Class... classes) { + static < + R extends IdBasedReadableNetworkPacket, + C extends Connection> ReadableNetworkPacketRegistry of( + Class type, + Class... classes) { return new IdBasedReadableNetworkPacketRegistry<>(type) .register(classes, classes.length); } + @SafeVarargs + static < + R extends IdBasedReadableNetworkPacket, + C extends Connection> ReadableNetworkPacketRegistry of( + Class type, + Class connectionType, + Class... classes) { + return new IdBasedReadableNetworkPacketRegistry<>(ClassUtils.>unsafeCast(type)) + .register(classes, classes.length); + } + /** * Creates a new readable packet registry. */ - static > ReadableNetworkPacketRegistry of( - Class type, - Array> classes) { + static < + R extends IdBasedReadableNetworkPacket, + C extends Connection> ReadableNetworkPacketRegistry of( + Class type, + Array> classes) { return new IdBasedReadableNetworkPacketRegistry<>(type) .register(classes); } diff --git a/rlib-network/src/main/java/javasabr/rlib/network/packet/registry/impl/IdBasedReadableNetworkPacketRegistry.java b/rlib-network/src/main/java/javasabr/rlib/network/packet/registry/impl/IdBasedReadableNetworkPacketRegistry.java index 815911c2..ea22f92f 100644 --- a/rlib-network/src/main/java/javasabr/rlib/network/packet/registry/impl/IdBasedReadableNetworkPacketRegistry.java +++ b/rlib-network/src/main/java/javasabr/rlib/network/packet/registry/impl/IdBasedReadableNetworkPacketRegistry.java @@ -7,6 +7,7 @@ import javasabr.rlib.common.util.ArrayUtils; import javasabr.rlib.common.util.ClassUtils; import javasabr.rlib.common.util.ObjectUtils; +import javasabr.rlib.network.Connection; import javasabr.rlib.network.annotation.NetworkPacketDescription; import javasabr.rlib.network.packet.IdBasedReadableNetworkPacket; import javasabr.rlib.network.packet.registry.ReadableNetworkPacketRegistry; @@ -27,8 +28,9 @@ @Accessors(fluent = true, chain = false) @FieldDefaults(level = AccessLevel.PROTECTED) @RequiredArgsConstructor(access = AccessLevel.PROTECTED) -public class IdBasedReadableNetworkPacketRegistry> - implements ReadableNetworkPacketRegistry { +public class IdBasedReadableNetworkPacketRegistry< + R extends IdBasedReadableNetworkPacket, + C extends Connection> implements ReadableNetworkPacketRegistry { @Getter(AccessLevel.PROTECTED) final Class type; @@ -46,7 +48,7 @@ public IdBasedReadableNetworkPacketRegistry(Class type) { * * @throws IllegalArgumentException if found a class without description annotation or if found duplication by id. */ - public IdBasedReadableNetworkPacketRegistry register(Array> classes) { + public IdBasedReadableNetworkPacketRegistry register(Array> classes) { return register(classes.toArray(), classes.size()); } @@ -57,7 +59,7 @@ public IdBasedReadableNetworkPacketRegistry register(Array * id. */ @SafeVarargs - public final IdBasedReadableNetworkPacketRegistry register(Class... classes) { + public final IdBasedReadableNetworkPacketRegistry register(Class... classes) { return register(classes, classes.length); } @@ -66,7 +68,7 @@ public final IdBasedReadableNetworkPacketRegistry register(Class * * @throws IllegalArgumentException if found a class without description annotation or if found duplication by id. */ - public IdBasedReadableNetworkPacketRegistry register(Class[] classes, int length) { + public IdBasedReadableNetworkPacketRegistry register(Class[] classes, int length) { Optional> incorrectClass = Arrays .stream(classes, 0, length) @@ -114,7 +116,7 @@ public IdBasedReadableNetworkPacketRegistry register(Class[] cla * @throws IllegalArgumentException if this class doesn't have {@link NetworkPacketDescription}, wrong id or some class is * already presented with the same id. */ - public IdBasedReadableNetworkPacketRegistry register(Class cs) { + public IdBasedReadableNetworkPacketRegistry register(Class cs) { return register(cs, () -> ClassUtils.newInstance(cs)); } @@ -125,7 +127,7 @@ public IdBasedReadableNetworkPacketRegistry register(Class cs) { * @throws IllegalArgumentException if this class doesn't have {@link NetworkPacketDescription}, wrong id or some class is * already presented with the same id. */ - public

IdBasedReadableNetworkPacketRegistry register(Class

cs, Supplier

factory) { + public

IdBasedReadableNetworkPacketRegistry register(Class

cs, Supplier

factory) { var description = cs.getAnnotation(NetworkPacketDescription.class); if (description == null) { diff --git a/rlib-network/src/main/java/javasabr/rlib/network/server/ServerNetwork.java b/rlib-network/src/main/java/javasabr/rlib/network/server/ServerNetwork.java index 1fa6aa30..158e672e 100644 --- a/rlib-network/src/main/java/javasabr/rlib/network/server/ServerNetwork.java +++ b/rlib-network/src/main/java/javasabr/rlib/network/server/ServerNetwork.java @@ -11,7 +11,7 @@ * * @author JavaSaBr */ -public interface ServerNetwork> extends Network { +public interface ServerNetwork> extends Network { /** * Start a server using any available address. diff --git a/rlib-network/src/main/java/javasabr/rlib/network/server/impl/DefaultServerNetwork.java b/rlib-network/src/main/java/javasabr/rlib/network/server/impl/DefaultServerNetwork.java index 46e6e2a2..3a2ef320 100644 --- a/rlib-network/src/main/java/javasabr/rlib/network/server/impl/DefaultServerNetwork.java +++ b/rlib-network/src/main/java/javasabr/rlib/network/server/impl/DefaultServerNetwork.java @@ -11,6 +11,8 @@ import java.nio.channels.CompletionHandler; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -29,6 +31,8 @@ import javasabr.rlib.network.util.NetworkUtils; import lombok.AccessLevel; import lombok.CustomLog; +import lombok.Getter; +import lombok.experimental.Accessors; import lombok.experimental.FieldDefaults; import reactor.core.publisher.Flux; import reactor.core.publisher.FluxSink; @@ -39,11 +43,12 @@ * @author JavaSaBr */ @CustomLog +@Accessors(fluent = true, chain = false) @FieldDefaults(level = AccessLevel.PROTECTED, makeFinal = true) -public class DefaultServerNetwork> +public class DefaultServerNetwork> extends AbstractNetwork implements ServerNetwork { - private interface ServerCompletionHandler> extends + private interface ServerCompletionHandler> extends CompletionHandler> {} private final ServerCompletionHandler acceptHandler = new ServerCompletionHandler<>() { @@ -70,6 +75,9 @@ public void failed(Throwable exc, DefaultServerNetwork network) { } }; + @Getter + ScheduledExecutorService scheduledExecutor; + AsynchronousChannelGroup group; AsynchronousServerSocketChannel channel; MutableArray> subscribers; @@ -79,6 +87,7 @@ public DefaultServerNetwork( BiFunction, AsynchronousSocketChannel, C> channelToConnection) { super(config, channelToConnection); this.group = Utils.uncheckedGet(buildExecutor(config), AsynchronousChannelGroup::withThreadPool); + this.scheduledExecutor = buildScheduledExecutor(config); this.channel = Utils.uncheckedGet(group, AsynchronousServerSocketChannel::open); this.subscribers = ArrayFactory.copyOnModifyArray(Consumer.class); log.info(config, DefaultServerNetwork::buildConfigDescription); @@ -183,9 +192,32 @@ protected ExecutorService buildExecutor(ServerNetworkConfig config) { return executorService; } + protected ScheduledExecutorService buildScheduledExecutor(ServerNetworkConfig config) { + + var threadFactory = new GroupThreadFactory( + config.scheduledThreadGroupName(), + config.threadConstructor(), + config.threadPriority(), + false); + + ScheduledExecutorService executorService; + if (config.threadGroupMinSize() < config.threadGroupMaxSize()) { + executorService = new ScheduledThreadPoolExecutor( + config.scheduledThreadGroupSize(), + threadFactory, + new ThreadPoolExecutor.CallerRunsPolicy()); + } else { + executorService = Executors.newScheduledThreadPool(config.threadGroupMinSize(), threadFactory); + } + + // activate the executor + executorService.submit(() -> {}); + return executorService; + } + private static String buildConfigDescription(ServerNetworkConfig conf) { return "Server network configuration: {\n" + " minThreads: " + conf.threadGroupMinSize() + ",\n" + " maxThreads: " - + conf.threadGroupMaxSize() + ",\n" + " priority: " + conf.threadPriority() + ",\n" + " groupName: \"" + + conf.threadGroupMaxSize() + ",\n" + " priority: " + conf.threadPriority() + ",\n" + " threadGroupName: \"" + conf.threadGroupName() + "\",\n" + " readBufferSize: " + conf.readBufferSize() + ",\n" + " pendingBufferSize: " + conf.pendingBufferSize() + ",\n" + " writeBufferSize: " + conf.writeBufferSize() + "\n" + "}"; diff --git a/rlib-network/src/test/java/javasabr/rlib/network/BaseNetworkTest.java b/rlib-network/src/test/java/javasabr/rlib/network/BaseNetworkTest.java index 9fc8f5b4..47e9bdf1 100644 --- a/rlib-network/src/test/java/javasabr/rlib/network/BaseNetworkTest.java +++ b/rlib-network/src/test/java/javasabr/rlib/network/BaseNetworkTest.java @@ -1,24 +1,76 @@ package javasabr.rlib.network; import java.util.concurrent.CompletableFuture; +import java.util.function.BiConsumer; +import javasabr.rlib.common.util.ClassUtils; import javasabr.rlib.network.client.ClientNetwork; import javasabr.rlib.network.impl.DefaultBufferAllocator; import javasabr.rlib.network.impl.DefaultConnection; import javasabr.rlib.network.impl.StringDataConnection; import javasabr.rlib.network.impl.StringDataSslConnection; +import javasabr.rlib.network.packet.ReadableNetworkPacket; +import javasabr.rlib.network.packet.WritableNetworkPacket; import javasabr.rlib.network.packet.impl.DefaultReadableNetworkPacket; +import javasabr.rlib.network.packet.impl.StringReadableNetworkPacket; import javasabr.rlib.network.packet.registry.ReadableNetworkPacketRegistry; import javasabr.rlib.network.server.ServerNetwork; import javax.net.ssl.SSLContext; import lombok.AllArgsConstructor; +import reactor.core.publisher.Flux; /** * @author JavaSaBr */ public class BaseNetworkTest { + protected static final Class> RECEIVED_PACKET_TYPE = + ClassUtils.unsafeCast(StringReadableNetworkPacket.class); + + public static class MockConnection implements Connection { + @Override + public String remoteAddress() { + return ""; + } + + @Override + public long lastActivity() { + return 0; + } + + @Override + public void close() {} + + @Override + public boolean closed() { + return false; + } + + @Override + public Flux>> receivedEvents() { + return Flux.empty(); + } + + @Override + public void send(WritableNetworkPacket packet) {} + + @Override + public CompletableFuture sendWithFeedback(WritableNetworkPacket packet) { + return CompletableFuture.completedFuture(false); + } + + @Override + public Flux receivedPackets() { + return Flux.empty(); + } + + @Override + public void onReceive(BiConsumer consumer) {} + } + + public static final MockConnection MOCK_CONNECTION = new MockConnection(); + @AllArgsConstructor - public static class TestNetwork> implements AutoCloseable { + public static class TestNetwork> implements AutoCloseable { public final ServerNetworkConfig serverNetworkConfig; public final NetworkConfig clientNetworkConfig; @@ -146,8 +198,8 @@ protected TestNetwork buildStringSSLNetwork( } protected TestNetwork buildDefaultNetwork( - ReadableNetworkPacketRegistry serverPacketRegistry, - ReadableNetworkPacketRegistry clientPacketRegistry) { + ReadableNetworkPacketRegistry, DefaultConnection> serverPacketRegistry, + ReadableNetworkPacketRegistry, DefaultConnection> clientPacketRegistry) { return buildDefaultNetwork( ServerNetworkConfig.DEFAULT_SERVER, new DefaultBufferAllocator(ServerNetworkConfig.DEFAULT_SERVER), @@ -159,9 +211,9 @@ protected TestNetwork buildDefaultNetwork( protected TestNetwork buildDefaultNetwork( BufferAllocator serverBufferAllocator, - ReadableNetworkPacketRegistry serverPacketRegistry, + ReadableNetworkPacketRegistry, DefaultConnection> serverPacketRegistry, BufferAllocator clientBufferAllocator, - ReadableNetworkPacketRegistry clientPacketRegistry) { + ReadableNetworkPacketRegistry, DefaultConnection> clientPacketRegistry) { return buildDefaultNetwork( ServerNetworkConfig.DEFAULT_SERVER, serverBufferAllocator, @@ -174,10 +226,10 @@ protected TestNetwork buildDefaultNetwork( protected TestNetwork buildDefaultNetwork( ServerNetworkConfig serverNetworkConfig, BufferAllocator serverBufferAllocator, - ReadableNetworkPacketRegistry serverPacketRegistry, + ReadableNetworkPacketRegistry, DefaultConnection> serverPacketRegistry, NetworkConfig clientNetworkConfig, BufferAllocator clientBufferAllocator, - ReadableNetworkPacketRegistry clientPacketRegistry) { + ReadableNetworkPacketRegistry, DefaultConnection> clientPacketRegistry) { var asyncClientToServer = new CompletableFuture(); var asyncServerToClient = new CompletableFuture(); diff --git a/rlib-network/src/test/java/javasabr/rlib/network/DefaultNetworkTest.java b/rlib-network/src/test/java/javasabr/rlib/network/DefaultNetworkTest.java index 43d86534..707d636e 100644 --- a/rlib-network/src/test/java/javasabr/rlib/network/DefaultNetworkTest.java +++ b/rlib-network/src/test/java/javasabr/rlib/network/DefaultNetworkTest.java @@ -19,6 +19,7 @@ import javasabr.rlib.network.impl.DefaultBufferAllocator; import javasabr.rlib.network.impl.DefaultConnection; import javasabr.rlib.network.packet.MarkerNetworkPacket; +import javasabr.rlib.network.packet.ReadableNetworkPacket; import javasabr.rlib.network.packet.impl.DefaultReadableNetworkPacket; import javasabr.rlib.network.packet.impl.DefaultWritableNetworkPacket; import javasabr.rlib.network.packet.registry.ReadableNetworkPacketRegistry; @@ -44,30 +45,31 @@ interface ClientPackets { @RequiredArgsConstructor @NetworkPacketDescription(id = 1) - class RequestEchoMessage extends DefaultWritableNetworkPacket { + class RequestEchoMessage extends DefaultWritableNetworkPacket { private final String message; @Override - protected void writeImpl(ByteBuffer buffer) { - super.writeImpl(buffer); + protected void writeImpl(DefaultConnection connection, ByteBuffer buffer) { + super.writeImpl(connection, buffer); writeString(buffer, message); } } @NetworkPacketDescription(id = 2) - class RequestServerTime extends DefaultWritableNetworkPacket implements MarkerNetworkPacket {} + class RequestServerTime extends DefaultWritableNetworkPacket + implements MarkerNetworkPacket {} @RequiredArgsConstructor @NetworkPacketDescription(id = 3) - class ResponseEchoMessage extends DefaultReadableNetworkPacket { + class ResponseEchoMessage extends DefaultReadableNetworkPacket { @Getter @Nullable private volatile String message; @Override - protected void readImpl(ByteBuffer buffer) { + protected void readImpl(DefaultConnection connection, ByteBuffer buffer) { message = readString(buffer, Integer.MAX_VALUE); } @@ -78,15 +80,15 @@ public String toString() { } @NetworkPacketDescription(id = 4) - class ResponseServerTime extends DefaultReadableNetworkPacket { + class ResponseServerTime extends DefaultReadableNetworkPacket { @Getter @Nullable private volatile LocalDateTime localDateTime; @Override - protected void readImpl(ByteBuffer buffer) { - super.readImpl(buffer); + protected void readImpl(DefaultConnection connection, ByteBuffer buffer) { + super.readImpl(connection, buffer); localDateTime = LocalDateTime.ofEpochSecond(readLong(buffer), 0, ZoneOffset.ofTotalSeconds(readInt(buffer))); } @@ -101,13 +103,13 @@ public String toString() { interface ServerPackets { @NetworkPacketDescription(id = 1) - class RequestEchoMessage extends DefaultReadableNetworkPacket { + class RequestEchoMessage extends DefaultReadableNetworkPacket { private volatile String message; @Override - protected void readImpl(ByteBuffer buffer) { - super.readImpl(buffer); + protected void readImpl(DefaultConnection connection, ByteBuffer buffer) { + super.readImpl(connection, buffer); message = readString(buffer, Integer.MAX_VALUE); } @@ -118,7 +120,8 @@ public String toString() { } @NetworkPacketDescription(id = 2) - class RequestServerTime extends DefaultReadableNetworkPacket implements MarkerNetworkPacket { + class RequestServerTime extends DefaultReadableNetworkPacket + implements MarkerNetworkPacket { @Override public String toString() { @@ -128,23 +131,23 @@ public String toString() { @RequiredArgsConstructor @NetworkPacketDescription(id = 3) - class ResponseEchoMessage extends DefaultWritableNetworkPacket { + class ResponseEchoMessage extends DefaultWritableNetworkPacket { private final String message; @Override - protected void writeImpl(ByteBuffer buffer) { - super.writeImpl(buffer); + protected void writeImpl(DefaultConnection connection, ByteBuffer buffer) { + super.writeImpl(connection, buffer); writeString(buffer, "Echo: " + message); } } @NetworkPacketDescription(id = 4) - class ResponseServerTime extends DefaultWritableNetworkPacket { + class ResponseServerTime extends DefaultWritableNetworkPacket { @Override - protected void writeImpl(ByteBuffer buffer) { - super.writeImpl(buffer); + protected void writeImpl(DefaultConnection connection, ByteBuffer buffer) { + super.writeImpl(connection, buffer); var dateTime = ZonedDateTime.now(); writeLong(buffer, dateTime.toEpochSecond()); writeInt(buffer, dateTime.getOffset().getTotalSeconds()); @@ -159,12 +162,14 @@ void echoNetworkTest() { //LoggerManager.enable(DefaultNetworkTest.class, LoggerLevel.INFO); //LoggerManager.enable(AbstractNetworkPacketReader.class, LoggerLevel.DEBUG); - ReadableNetworkPacketRegistry serverPackets = ReadableNetworkPacketRegistry.of( + var serverPackets = ReadableNetworkPacketRegistry.of( DefaultReadableNetworkPacket.class, + DefaultConnection.class, ServerPackets.RequestEchoMessage.class, ServerPackets.RequestServerTime.class); - ReadableNetworkPacketRegistry clientPackets = ReadableNetworkPacketRegistry.of( + var clientPackets = ReadableNetworkPacketRegistry.of( DefaultReadableNetworkPacket.class, + DefaultConnection.class, ClientPackets.ResponseEchoMessage.class, ClientPackets.ResponseServerTime.class); @@ -218,10 +223,12 @@ void shouldNotUseMappedBuffers() { var serverPacketRegistry = ReadableNetworkPacketRegistry.of( DefaultReadableNetworkPacket.class, + DefaultConnection.class, ServerPackets.RequestEchoMessage.class, ServerPackets.RequestServerTime.class); var clientPacketRegistry = ReadableNetworkPacketRegistry.of( DefaultReadableNetworkPacket.class, + DefaultConnection.class, ClientPackets.ResponseEchoMessage.class, ClientPackets.ResponseServerTime.class); @@ -265,7 +272,7 @@ public ByteBuffer takeBuffer(int bufferSize) { .peek(message -> clientToServer.send(new ClientPackets.RequestEchoMessage(message))) .toList(); - List receivedPackets = + List> receivedPackets = ObjectUtils.notNull(pendingPacketsOnServer.blockFirst(Duration.ofSeconds(5))); Assertions.assertEquals(packetCount, receivedPackets.size(), "Didn't receive all packets"); diff --git a/rlib-network/src/test/java/javasabr/rlib/network/IdBasedReadableNetworkPacketRegistryTest.java b/rlib-network/src/test/java/javasabr/rlib/network/IdBasedReadableNetworkPacketRegistryTest.java index c30249ee..0213d8cb 100644 --- a/rlib-network/src/test/java/javasabr/rlib/network/IdBasedReadableNetworkPacketRegistryTest.java +++ b/rlib-network/src/test/java/javasabr/rlib/network/IdBasedReadableNetworkPacketRegistryTest.java @@ -30,7 +30,7 @@ public static class Impl2 extends DefaultReadableNetworkPacket {} public static class Impl3 extends DefaultReadableNetworkPacket {} @NoArgsConstructor - private static class PrivateBase extends AbstractIdBasedReadableNetworkPacket {} + private static class PrivateBase extends AbstractIdBasedReadableNetworkPacket {} @NoArgsConstructor @NetworkPacketDescription(id = 1) @@ -41,7 +41,7 @@ private static class PrivateImpl1 extends PrivateBase {} private static class PrivateImpl2 extends PrivateBase {} @NoArgsConstructor - public static class PublicBase extends AbstractIdBasedReadableNetworkPacket {} + public static class PublicBase extends AbstractIdBasedReadableNetworkPacket {} @NoArgsConstructor @NetworkPacketDescription(id = 1) diff --git a/rlib-network/src/test/java/javasabr/rlib/network/NetworkPacketTest.java b/rlib-network/src/test/java/javasabr/rlib/network/NetworkPacketTest.java new file mode 100644 index 00000000..413518b6 --- /dev/null +++ b/rlib-network/src/test/java/javasabr/rlib/network/NetworkPacketTest.java @@ -0,0 +1,178 @@ +package javasabr.rlib.network; + +import java.nio.ByteBuffer; +import java.util.Arrays; +import javasabr.rlib.network.BaseNetworkTest.MockConnection; +import javasabr.rlib.network.packet.impl.AbstractReadableNetworkPacket; +import javasabr.rlib.network.packet.impl.AbstractWritableNetworkPacket; +import lombok.Builder; +import lombok.EqualsAndHashCode; +import lombok.ToString; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; + +class NetworkPacketTest { + + @Builder + @ToString + @EqualsAndHashCode(exclude = {"byteArrayField", "bytesField1", "bytesField2"}) + private static class TestReadablePacket + extends AbstractReadableNetworkPacket { + + private int byteField; + private int byteUnsignedField; + private int shortField; + private int shortUnsignedField; + private int intField; + private long intUnsignedField; + private long longField; + private float floatField; + private double doubleField; + private byte[] byteArrayField; + private byte[] bytesField1; + private byte[] bytesField2; + private String stringField; + + @Override + protected void readImpl(MockConnection connection, ByteBuffer buffer) { + super.readImpl(connection, buffer); + byteField = readByte(buffer); + byteUnsignedField = readByteUnsigned(buffer); + shortField = readShort(buffer); + shortUnsignedField = readShortUnsigned(buffer); + intField = readInt(buffer); + intUnsignedField = readIntUnsigned(buffer); + longField = readLong(buffer); + floatField = readFloat(buffer); + doubleField = readDouble(buffer); + byteArrayField = readByteArray(buffer); + int bytesCount = readInt(buffer); + bytesField1 = new byte[bytesCount]; + readBytes(buffer, bytesField1); + bytesCount = readInt(buffer); + bytesField2 = new byte[bytesCount]; + readBytes(buffer, bytesField2, 0, bytesCount); + stringField = readString(buffer, Integer.MAX_VALUE); + } + } + + @Builder + private static class TestWritablePacket + extends AbstractWritableNetworkPacket { + + private int byteField; + private int byteUnsignedField; + private int shortField; + private int shortUnsignedField; + private int intField; + private long intUnsignedField; + private long longField; + private float floatField; + private double doubleField; + private byte[] byteArrayField; + private byte[] bytesField1; + private byte[] bytesField2; + private String stringField; + + @Override + protected void writeImpl(MockConnection connection, ByteBuffer buffer) { + super.writeImpl(connection, buffer); + writeByte(buffer, byteField); + writeByte(buffer, byteUnsignedField); + writeShort(buffer, shortField); + writeShort(buffer, shortUnsignedField); + writeInt(buffer, intField); + writeInt(buffer, (int) intUnsignedField); + writeLong(buffer, longField); + writeFloat(buffer, floatField); + writeDouble(buffer, doubleField); + writeByteArray(buffer, byteArrayField); + writeInt(buffer, bytesField1.length); + writeBytes(buffer, bytesField1); + writeInt(buffer, bytesField2.length); + writeBytes(buffer, bytesField2, 0, bytesField2.length); + writeString(buffer, stringField); + } + } + + @Test + @DisplayName("should write packet correctly") + void shouldWritePacketCorrectly() { + + // given: + TestWritablePacket packet = TestWritablePacket + .builder() + .byteField(100) + .byteUnsignedField(200) + .shortField(25600) + .shortUnsignedField(44000) + .intField(500_400) + .intUnsignedField(3_345_123_436L) + .longField(4_564_869_376L) + .floatField(1.55F) + .doubleField(5.23) + .byteArrayField(new byte[] {4, 6, 9}) + .bytesField1(new byte[] {9, 1, 5, 8, 9}) + .bytesField2(new byte[] {3, 1, 1, 8, 9, 9, 5}) + .stringField("test string") + .build(); + + var buffer = ByteBuffer.allocate(256); + byte[] expected = {100, -56, 100, 0, -85, -32, 0, 7, -94, -80, -57, 98, -120, 108, + 0, 0, 0, 1, 16, 22, 97, 0, 63, -58, 102, 102, 64, 20, -21, -123, + 30, -72, 81, -20, 0, 0, 0, 3, 4, 6, 9, 0, 0, 0, 5, 9, 1, 5, + 8, 9, 0, 0, 0, 7, 3, 1, 1, 8, 9, 9, 5, 0, 0, 0, 11, 0, 116, + 0, 101, 0, 115, 0, 116, 0, 32, 0, 115, 0, 116, 0, 114, 0, + 105, 0, 110, 0, 103}; + + // when: + packet.write(BaseNetworkTest.MOCK_CONNECTION, buffer); + buffer.flip(); + + // then: + byte[] wroteBytes = Arrays.copyOf(buffer.array(), buffer.limit()); + Assertions.assertArrayEquals(expected, wroteBytes); + } + + @Test + @DisplayName("should read packet correctly") + void shouldReadPacketCorrectly() { + + // given: + byte[] data = {100, -56, 100, 0, -85, -32, 0, 7, -94, -80, -57, 98, -120, 108, + 0, 0, 0, 1, 16, 22, 97, 0, 63, -58, 102, 102, 64, 20, -21, -123, + 30, -72, 81, -20, 0, 0, 0, 3, 4, 6, 9, 0, 0, 0, 5, 9, 1, 5, + 8, 9, 0, 0, 0, 7, 3, 1, 1, 8, 9, 9, 5, 0, 0, 0, 11, 0, 116, + 0, 101, 0, 115, 0, 116, 0, 32, 0, 115, 0, 116, 0, 114, 0, + 105, 0, 110, 0, 103}; + + TestReadablePacket packet = TestReadablePacket.builder().build(); + + TestReadablePacket expected = TestReadablePacket + .builder() + .byteField(100) + .byteUnsignedField(200) + .shortField(25600) + .shortUnsignedField(44000) + .intField(500_400) + .intUnsignedField(3_345_123_436L) + .longField(4_564_869_376L) + .floatField(1.55F) + .doubleField(5.23) + .byteArrayField(new byte[] {4, 6, 9}) + .bytesField1(new byte[] {9, 1, 5, 8, 9}) + .bytesField2(new byte[] {3, 1, 1, 8, 9, 9, 5}) + .stringField("test string") + .build(); + + // when: + packet.read(BaseNetworkTest.MOCK_CONNECTION, ByteBuffer.wrap(data), data.length); + + // then: + Assertions.assertEquals(expected, packet); + Assertions.assertArrayEquals(expected.byteArrayField, packet.byteArrayField); + Assertions.assertArrayEquals(expected.bytesField1, packet.bytesField1); + Assertions.assertArrayEquals(expected.bytesField2, packet.bytesField2); + } +} diff --git a/rlib-network/src/test/java/javasabr/rlib/network/StringNetworkTest.java b/rlib-network/src/test/java/javasabr/rlib/network/StringNetworkTest.java index 9378b0d5..8de053a2 100644 --- a/rlib-network/src/test/java/javasabr/rlib/network/StringNetworkTest.java +++ b/rlib-network/src/test/java/javasabr/rlib/network/StringNetworkTest.java @@ -21,7 +21,7 @@ import javasabr.rlib.network.client.ClientNetwork; import javasabr.rlib.network.impl.DefaultBufferAllocator; import javasabr.rlib.network.impl.StringDataConnection; -import javasabr.rlib.network.packet.impl.StringReadablePacket; +import javasabr.rlib.network.packet.impl.StringReadableNetworkPacket; import javasabr.rlib.network.packet.impl.StringWritableNetworkPacket; import javasabr.rlib.network.server.ServerNetwork; import lombok.CustomLog; @@ -54,11 +54,11 @@ void echoNetworkTest() { serverNetwork .accepted() - .flatMap(Connection::receivedEvents) + .flatMap(connection -> connection.receivedEvents(RECEIVED_PACKET_TYPE)) .subscribe(event -> { String message = event.packet().data(); log.info(message, "Received from client:[%s]"::formatted); - event.connection().send(new StringWritableNetworkPacket("Echo: " + message)); + event.connection().send(new StringWritableNetworkPacket<>("Echo: " + message)); }); ScheduledExecutorService executor = Executors.newScheduledThreadPool(1); @@ -68,13 +68,13 @@ void echoNetworkTest() { .doOnNext(connection -> IntStream .range(10, 200) .forEach(length -> { - var packet = new StringWritableNetworkPacket(StringUtils.generate(length)); + var packet = new StringWritableNetworkPacket(StringUtils.generate(length)); int delay = ThreadLocalRandom .current() .nextInt(50); executor.schedule(() -> connection.send(packet), delay, TimeUnit.MILLISECONDS); })) - .flatMapMany(Connection::receivedEvents) + .flatMapMany(connection -> connection.receivedEvents(RECEIVED_PACKET_TYPE)) .subscribe(event -> { log.info(event.packet().data(), "Received from server:[%s]"::formatted); counter.countDown(); @@ -119,7 +119,7 @@ public ByteBuffer takeBuffer(int bufferSize) { StringDataConnection serverToClient = testNetwork.serverToClient; var pendingPacketsOnServer = serverToClient - .receivedPackets() + .receivedPackets(RECEIVED_PACKET_TYPE) .buffer(packetCount); List messages = IntStream @@ -127,11 +127,11 @@ public ByteBuffer takeBuffer(int bufferSize) { .mapToObj(value -> StringUtils.generate(random.nextInt(0, bufferSize))) .peek(message -> { log.info(message.length(), "Send [%s] symbols to server"::formatted); - clientToServer.send(new StringWritableNetworkPacket(message)); + clientToServer.send(new StringWritableNetworkPacket<>(message)); }) .toList(); - List receivedPackets = ObjectUtils + List> receivedPackets = ObjectUtils .notNull(pendingPacketsOnServer.blockFirst(Duration.ofSeconds(5))); log.info(receivedPackets.size(), "Received [%s] packets from client"::formatted); @@ -165,7 +165,7 @@ void shouldReceiveManyPacketsFromSmallToBigSize() { StringDataConnection serverToClient = testNetwork.serverToClient; var pendingPacketsOnServer = serverToClient - .receivedPackets() + .receivedPackets(RECEIVED_PACKET_TYPE) .doOnNext(packet -> log.info(packet.data().length(), "Received [%s] symbols from client"::formatted)) .buffer(packetCount); @@ -177,7 +177,7 @@ void shouldReceiveManyPacketsFromSmallToBigSize() { return StringUtils.generate(length); }) .peek(message -> { - var packet = new StringWritableNetworkPacket(message); + var packet = new StringWritableNetworkPacket(message); int delay = ThreadLocalRandom .current() .nextInt(15); @@ -189,7 +189,7 @@ void shouldReceiveManyPacketsFromSmallToBigSize() { }) .toList(); - List receivedPackets = + List> receivedPackets = ObjectUtils.notNull(pendingPacketsOnServer.blockFirst(Duration.ofSeconds(5))); log.info(receivedPackets.size(), "Received [%s] packets from client"::formatted); @@ -223,7 +223,7 @@ void shouldSendBiggerPacketThanWriteBuffer() { StringDataConnection serverToClient = testNetwork.serverToClient; var pendingPacketsOnServer = serverToClient - .receivedPackets() + .receivedPackets(RECEIVED_PACKET_TYPE) .doOnNext(packet -> log.info(packet.data().length(), "Received [%s] symbols from client"::formatted)) .buffer(packetCount); @@ -235,7 +235,7 @@ void shouldSendBiggerPacketThanWriteBuffer() { return StringUtils.generate(length); }) .peek(message -> { - var packet = new StringWritableNetworkPacket(message); + var packet = new StringWritableNetworkPacket(message); int delay = ThreadLocalRandom .current() .nextInt(15); @@ -247,7 +247,7 @@ void shouldSendBiggerPacketThanWriteBuffer() { }) .toList(); - List receivedPackets = + List> receivedPackets = ObjectUtils.notNull(pendingPacketsOnServer.blockFirst(Duration.ofSeconds(5))); log.info(receivedPackets.size(), "Received [%s] packets from client"::formatted); @@ -273,7 +273,7 @@ void testServerWithMultiplyClients() { var serverConfig = SimpleServerNetworkConfig .builder() - .threadGroupSize(10) + .threadGroupMaxSize(10) .build(); var serverAllocator = new DefaultBufferAllocator(serverConfig); @@ -335,7 +335,7 @@ void testServerWithMultiplyClientsUsingOldApi() { var serverNetwork = NetworkFactory.stringDataServerNetwork(SimpleServerNetworkConfig .builder() - .threadGroupSize(10) + .threadGroupMaxSize(10) .build()); InetSocketAddress serverAddress = serverNetwork.start(); @@ -409,7 +409,7 @@ void shouldGetAllPacketWithFeedback() { List> asyncResults = IntStream .range(0, packetCount) .mapToObj(value -> StringUtils.generate(random.nextInt(0, bufferSize))) - .map(message -> clientToServer.sendWithFeedback(new StringWritableNetworkPacket(message))) + .map(message -> clientToServer.sendWithFeedback(new StringWritableNetworkPacket<>(message))) .toList(); CompletableFuture @@ -432,7 +432,7 @@ void shouldGetAllPacketWithFeedback() { } } - private static StringWritableNetworkPacket newMessage(int minMessageLength, int maxMessageLength) { - return new StringWritableNetworkPacket(StringUtils.generate(minMessageLength, maxMessageLength)); + private static StringWritableNetworkPacket newMessage(int minMessageLength, int maxMessageLength) { + return new StringWritableNetworkPacket<>(StringUtils.generate(minMessageLength, maxMessageLength)); } } diff --git a/rlib-network/src/test/java/javasabr/rlib/network/StringSslNetworkTest.java b/rlib-network/src/test/java/javasabr/rlib/network/StringSslNetworkTest.java index da87ee0e..b32ac629 100644 --- a/rlib-network/src/test/java/javasabr/rlib/network/StringSslNetworkTest.java +++ b/rlib-network/src/test/java/javasabr/rlib/network/StringSslNetworkTest.java @@ -18,16 +18,12 @@ import java.util.stream.IntStream; import javasabr.rlib.common.util.ObjectUtils; import javasabr.rlib.common.util.StringUtils; -import javasabr.rlib.common.util.ThreadUtils; import javasabr.rlib.common.util.Utils; -import javasabr.rlib.logger.api.LoggerLevel; -import javasabr.rlib.logger.api.LoggerManager; import javasabr.rlib.network.client.ClientNetwork; import javasabr.rlib.network.impl.DefaultBufferAllocator; import javasabr.rlib.network.impl.StringDataSslConnection; -import javasabr.rlib.network.packet.impl.AbstractSslNetworkPacketReader; -import javasabr.rlib.network.packet.impl.AbstractSslNetworkPacketWriter; -import javasabr.rlib.network.packet.impl.StringReadablePacket; +import javasabr.rlib.network.packet.ReadableNetworkPacket; +import javasabr.rlib.network.packet.impl.StringReadableNetworkPacket; import javasabr.rlib.network.packet.impl.StringWritableNetworkPacket; import javasabr.rlib.network.server.ServerNetwork; import javasabr.rlib.network.util.NetworkUtils; @@ -97,20 +93,19 @@ void serverSslNetworkTest() { .accepted() .flatMap(Connection::receivedEvents) .subscribe(event -> { - var message = event.packet().data(); + var message = ((StringReadableNetworkPacket) event.packet()).data(); log.info(message, "Received from client:[%s]"::formatted); - event.connection().send(new StringWritableNetworkPacket("Echo: " + message)); + event.connection().send(new StringWritableNetworkPacket<>("Echo: " + message)); }); SSLContext clientSslContext = NetworkUtils.createAllTrustedClientSslContext(); SSLSocketFactory sslSocketFactory = clientSslContext.getSocketFactory(); SSLSocket sslSocket = (SSLSocket) sslSocketFactory.createSocket(serverAddress.getHostName(), serverAddress.getPort()); - StringWritableNetworkPacket writableNetworkPacket = new StringWritableNetworkPacket("Hello SSL"); - + var writableNetworkPacket = new StringWritableNetworkPacket("Hello SSL"); var buffer = ByteBuffer.allocate(1024); buffer.position(2); - writableNetworkPacket.write(buffer); + writableNetworkPacket.write(null, buffer); buffer.putShort(0, (short) buffer.position()); buffer.flip(); @@ -128,8 +123,8 @@ void serverSslNetworkTest() { .flip(); short packetLength = buffer.getShort(); - StringReadablePacket response = new StringReadablePacket(); - response.read(buffer, packetLength - 2); + var response = new StringReadableNetworkPacket(); + response.read(null, buffer, packetLength - 2); log.info(response.data(), "Response:[%s]"::formatted); @@ -159,11 +154,12 @@ void clientSslNetworkTest() { clientNetwork .connectReactive(new InetSocketAddress("localhost", serverPort)) - .doOnNext(connection -> connection.send(new StringWritableNetworkPacket("Hello SSL"))) + .doOnNext(connection -> connection.send(new StringWritableNetworkPacket<>("Hello SSL"))) .doOnError(Throwable::printStackTrace) .flatMapMany(Connection::receivedEvents) .subscribe(event -> { - log.info(event.packet().data(), "Received from server:[%s]"::formatted); + var message = ((StringReadableNetworkPacket) event.packet()).data(); + log.info(message, "Received from server:[%s]"::formatted); counter.countDown(); }); @@ -180,18 +176,17 @@ void clientSslNetworkTest() { var dataLength = buffer.getShort(); - var receivedPacket = new StringReadablePacket(); - receivedPacket.read(buffer, dataLength); + var receivedPacket = new StringReadableNetworkPacket(); + receivedPacket.read(null, buffer, dataLength); Assertions.assertEquals("Hello SSL", receivedPacket.data()); log.info(receivedPacket.data(), "Received from client:[%s]"::formatted); - StringWritableNetworkPacket writableNetworkPacket = new StringWritableNetworkPacket("Echo: Hello SSL"); - + var writableNetworkPacket = new StringWritableNetworkPacket("Echo: Hello SSL"); buffer.clear(); buffer.position(2); - writableNetworkPacket.write(buffer); + writableNetworkPacket.write(null, buffer); buffer.putShort(0, (short) buffer.position()); buffer.flip(); @@ -233,9 +228,9 @@ void echoNetworkTest() { .accepted() .flatMap(Connection::receivedEvents) .subscribe(event -> { - var message = event.packet().data(); + var message = ((StringReadableNetworkPacket) event.packet()).data(); log.info(message, "Received from client:[%s]"::formatted); - event.connection().send(new StringWritableNetworkPacket("Echo: " + message)); + event.connection().send(new StringWritableNetworkPacket<>("Echo: " + message)); }); SSLContext clientSslContext = NetworkUtils.createAllTrustedClientSslContext(); @@ -262,7 +257,8 @@ void echoNetworkTest() { .doOnError(Throwable::printStackTrace) .flatMapMany(Connection::receivedEvents) .subscribe(event -> { - log.info(event.packet().data(), "Received from server:[%s]"::formatted); + var message = ((StringReadableNetworkPacket) event.packet()).data(); + log.info(message, "Received from server:[%s]"::formatted); counter.countDown(); }); @@ -298,7 +294,7 @@ void shouldReceiveManyPacketsFromSmallToBigSize() { var pendingPacketsOnServer = serverToClient .receivedPackets() - .doOnNext(packet -> log.info("Received from client: " + packet.data())) + .doOnNext(packet -> log.info("Received from client: " + packet)) .buffer(packetCount); var messages = IntStream @@ -307,10 +303,10 @@ void shouldReceiveManyPacketsFromSmallToBigSize() { var length = value % 3 == 0 ? bufferSize : random.nextInt(0, bufferSize / 2 - 1); return StringUtils.generate(length); }) - .peek(message -> clientToServer.send(new StringWritableNetworkPacket(message))) + .peek(message -> clientToServer.send(new StringWritableNetworkPacket<>(message))) .toList(); - List receivedPackets = + List> receivedPackets = ObjectUtils.notNull(pendingPacketsOnServer.blockFirst(Duration.ofSeconds(5000))); Assertions.assertEquals(packetCount, receivedPackets.size(), "Didn't receive all packets"); @@ -319,7 +315,7 @@ void shouldReceiveManyPacketsFromSmallToBigSize() { .stream() .filter(packet -> messages .stream() - .noneMatch(message -> message.equals(packet.data()))) + .noneMatch(message -> message.equals(((StringReadableNetworkPacket) packet).data()))) .findFirst() .orElse(null); @@ -327,7 +323,7 @@ void shouldReceiveManyPacketsFromSmallToBigSize() { } } - private static StringWritableNetworkPacket newMessage(int minMessageLength, int maxMessageLength) { - return new StringWritableNetworkPacket(StringUtils.generate(minMessageLength, maxMessageLength)); + private static StringWritableNetworkPacket newMessage(int minMessageLength, int maxMessageLength) { + return new StringWritableNetworkPacket<>(StringUtils.generate(minMessageLength, maxMessageLength)); } }