|
1331 | 1331 | :io-uring IOUringServerSocketChannel |
1332 | 1332 | :nio NioServerSocketChannel)) |
1333 | 1333 |
|
| 1334 | +(defn- wrapping-channel-factory |
| 1335 | + ^ChannelFactory [listen-socket transport] |
| 1336 | + (proxy [ChannelFactory] [] |
| 1337 | + (newChannel [] |
| 1338 | + (case transport |
| 1339 | + :epoll (EpollServerSocketChannel. ^long listen-socket) |
| 1340 | + :kqueue (KQueueServerSocketChannel. ^long listen-socket) |
| 1341 | + :nio (NioServerSocketChannel. ^java.nio.channels.ServerSocketChannel listen-socket))))) |
| 1342 | + |
| 1343 | +(defn- validate-listen-socket |
| 1344 | + [listen-socket transport] |
| 1345 | + (when (some? listen-socket) |
| 1346 | + (case transport |
| 1347 | + (:epoll :kqueue) (when-not (int? listen-socket) |
| 1348 | + (throw (IllegalArgumentException. |
| 1349 | + (str "With epoll and kqueue transports, only a numeric file descriptor " |
| 1350 | + "is supported as listen-socket, but received: " |
| 1351 | + (pr-str listen-socket))))) |
| 1352 | + |
| 1353 | + :nio (cond (not (instance? java.nio.channels.ServerSocketChannel listen-socket)) |
| 1354 | + (throw (IllegalArgumentException. |
| 1355 | + (str "With NIO transport, only a java.nio.channels.ServerSocketChannel " |
| 1356 | + "is supported as listen-socket, but received: " |
| 1357 | + (pr-str listen-socket)))) |
| 1358 | + |
| 1359 | + (nil? (.getLocalAddress ^java.nio.channels.ServerSocketChannel listen-socket)) |
| 1360 | + (throw (IllegalArgumentException. |
| 1361 | + (str "The listen-socket is not bound: " (pr-str listen-socket))))) |
| 1362 | + |
| 1363 | + (throw (IllegalArgumentException. |
| 1364 | + (str "The listen-socket option is not supported with this transport: " |
| 1365 | + (pr-str transport))))))) |
| 1366 | + |
1334 | 1367 | (defn ^:no-doc convert-address-types [address-types] |
1335 | 1368 | (case address-types |
1336 | 1369 | :ipv4-only ResolvedAddressTypes/IPV4_ONLY |
|
1668 | 1701 | (.addFirst pipeline "channel-tracker" ^ChannelHandler (channel-tracking-handler chan-group)) |
1669 | 1702 | (pipeline-builder pipeline))) |
1670 | 1703 |
|
1671 | | -(defn- validate-existing-channel |
1672 | | - [existing-channel] |
1673 | | - (when (some? existing-channel) |
1674 | | - (when-not (instance? java.nio.channels.ServerSocketChannel existing-channel) |
1675 | | - (throw (IllegalArgumentException. |
1676 | | - (str "The existing-channel type is not supported: " (pr-str existing-channel))))) |
1677 | | - (when (nil? (.getLocalAddress ^java.nio.channels.ServerSocketChannel existing-channel)) |
1678 | | - (throw (IllegalArgumentException. |
1679 | | - (str "The existing-channel is not bound: " (pr-str existing-channel))))))) |
1680 | | - |
1681 | | -(defn- wrapping-channel-factory |
1682 | | - ^ChannelFactory [^java.nio.channels.ServerSocketChannel channel] |
1683 | | - (proxy [ChannelFactory] [] |
1684 | | - (newChannel [] |
1685 | | - (NioServerSocketChannel. channel)))) |
1686 | | - |
1687 | 1704 | (defn ^:no-doc start-server |
1688 | 1705 | ([pipeline-builder |
1689 | 1706 | ssl-context |
|
1702 | 1719 | bootstrap-transform |
1703 | 1720 | on-close |
1704 | 1721 | ^SocketAddress socket-address |
1705 | | - existing-channel |
| 1722 | + listen-socket |
1706 | 1723 | transport |
1707 | 1724 | shutdown-timeout] |
1708 | 1725 | :or {shutdown-timeout default-shutdown-timeout} |
1709 | 1726 | :as opts}] |
1710 | 1727 | (ensure-transport-available! transport) |
1711 | | - (validate-existing-channel existing-channel) |
| 1728 | + (validate-listen-socket listen-socket transport) |
1712 | 1729 | (let [num-cores (.availableProcessors (Runtime/getRuntime)) |
1713 | 1730 | num-threads (* 2 num-cores) |
1714 | 1731 | thread-factory (enumerating-thread-factory "aleph-server-pool" false) |
|
1734 | 1751 | (.option ChannelOption/SO_REUSEADDR true) |
1735 | 1752 | (.option ChannelOption/MAX_MESSAGES_PER_READ Integer/MAX_VALUE) |
1736 | 1753 | (.group group) |
1737 | | - (cond-> (nil? existing-channel) (.channel channel-class)) |
1738 | | - (cond-> (some? existing-channel) (.channelFactory (wrapping-channel-factory existing-channel))) |
| 1754 | + (cond-> (nil? listen-socket) (.channel channel-class)) |
| 1755 | + (cond-> (some? listen-socket) (.channelFactory (wrapping-channel-factory listen-socket transport))) |
1739 | 1756 | ;;TODO: add a server (.handler) call to the bootstrap, for logging or something |
1740 | 1757 | (.childHandler (pipeline-initializer pipeline-builder)) |
1741 | 1758 | (.childOption ChannelOption/SO_REUSEADDR true) |
1742 | 1759 | (.childOption ChannelOption/MAX_MESSAGES_PER_READ Integer/MAX_VALUE) |
1743 | 1760 | bootstrap-transform) |
1744 | 1761 |
|
1745 | 1762 | ^ServerSocketChannel |
1746 | | - ch (-> (if (nil? existing-channel) |
| 1763 | + ch (-> (if (nil? listen-socket) |
1747 | 1764 | (.bind b socket-address) |
1748 | 1765 | (.register b)) |
1749 | 1766 | .sync |
|
1755 | 1772 | (when (compare-and-set! closed? false true) |
1756 | 1773 | ;; This is the three step closing sequence: |
1757 | 1774 | ;; 1. Stop listening to incoming requests |
1758 | | - (if (nil? existing-channel) |
| 1775 | + (if (nil? listen-socket) |
1759 | 1776 | (-> ch .close .sync) |
1760 | 1777 | (-> ch .deregister .sync)) |
1761 | 1778 | (-> (if (pos? shutdown-timeout) |
|
1785 | 1802 | (port [_] |
1786 | 1803 | (-> ch .localAddress .getPort)) |
1787 | 1804 | (wait-for-close [_] |
1788 | | - (when (nil? existing-channel) |
| 1805 | + (when (nil? listen-socket) |
1789 | 1806 | (-> ch .closeFuture .await)) |
1790 | 1807 | (-> group .terminationFuture .await) |
1791 | 1808 | nil))) |
|
0 commit comments