|
21 | 21 | Unpooled) |
22 | 22 | (io.netty.channel |
23 | 23 | Channel |
| 24 | + ChannelFactory |
24 | 25 | ChannelFuture |
25 | 26 | ChannelHandler |
26 | 27 | ChannelHandlerContext |
|
1667 | 1668 | (.addFirst pipeline "channel-tracker" ^ChannelHandler (channel-tracking-handler chan-group)) |
1668 | 1669 | (pipeline-builder pipeline))) |
1669 | 1670 |
|
| 1671 | +(defn- validate-existing-channel |
| 1672 | + [existing-channel] |
| 1673 | + (when (some? existing-channel) |
| 1674 | + (when-not (instance? java.nio.channels.ServerSocketChannel existing-channel) |
| 1675 | + (throw (IllegalArgumentException. |
| 1676 | + (str "The existing-channel type is not supported: " (pr-str existing-channel))))) |
| 1677 | + (when (nil? (.getLocalAddress ^java.nio.channels.ServerSocketChannel existing-channel)) |
| 1678 | + (throw (IllegalArgumentException. |
| 1679 | + (str "The existing-channel is not bound: " (pr-str existing-channel))))))) |
| 1680 | + |
| 1681 | +(defn- wrapping-channel-factory |
| 1682 | + ^ChannelFactory [^java.nio.channels.ServerSocketChannel channel] |
| 1683 | + (proxy [ChannelFactory] [] |
| 1684 | + (newChannel [] |
| 1685 | + (NioServerSocketChannel. channel)))) |
| 1686 | + |
1670 | 1687 | (defn ^:no-doc start-server |
1671 | 1688 | ([pipeline-builder |
1672 | 1689 | ssl-context |
|
1685 | 1702 | bootstrap-transform |
1686 | 1703 | on-close |
1687 | 1704 | ^SocketAddress socket-address |
| 1705 | + existing-channel |
1688 | 1706 | transport |
1689 | 1707 | shutdown-timeout] |
1690 | 1708 | :or {shutdown-timeout default-shutdown-timeout} |
1691 | 1709 | :as opts}] |
1692 | 1710 | (ensure-transport-available! transport) |
| 1711 | + (validate-existing-channel existing-channel) |
1693 | 1712 | (let [num-cores (.availableProcessors (Runtime/getRuntime)) |
1694 | 1713 | num-threads (* 2 num-cores) |
1695 | 1714 | thread-factory (enumerating-thread-factory "aleph-server-pool" false) |
|
1715 | 1734 | (.option ChannelOption/SO_REUSEADDR true) |
1716 | 1735 | (.option ChannelOption/MAX_MESSAGES_PER_READ Integer/MAX_VALUE) |
1717 | 1736 | (.group group) |
1718 | | - (.channel channel-class) |
| 1737 | + (cond-> (nil? existing-channel) (.channel channel-class)) |
| 1738 | + (cond-> (some? existing-channel) (.channelFactory (wrapping-channel-factory existing-channel))) |
1719 | 1739 | ;;TODO: add a server (.handler) call to the bootstrap, for logging or something |
1720 | 1740 | (.childHandler (pipeline-initializer pipeline-builder)) |
1721 | 1741 | (.childOption ChannelOption/SO_REUSEADDR true) |
1722 | 1742 | (.childOption ChannelOption/MAX_MESSAGES_PER_READ Integer/MAX_VALUE) |
1723 | 1743 | bootstrap-transform) |
1724 | 1744 |
|
1725 | 1745 | ^ServerSocketChannel |
1726 | | - ch (-> b (.bind socket-address) .sync .channel)] |
| 1746 | + ch (-> (if (nil? existing-channel) |
| 1747 | + (.bind b socket-address) |
| 1748 | + (.register b)) |
| 1749 | + .sync |
| 1750 | + .channel)] |
1727 | 1751 |
|
1728 | 1752 | (reify |
1729 | 1753 | Closeable |
1730 | 1754 | (close [_] |
1731 | 1755 | (when (compare-and-set! closed? false true) |
1732 | 1756 | ;; This is the three step closing sequence: |
1733 | 1757 | ;; 1. Stop listening to incoming requests |
1734 | | - (-> ch .close .sync) |
| 1758 | + (if (nil? existing-channel) |
| 1759 | + (-> ch .close .sync) |
| 1760 | + (-> ch .deregister .sync)) |
1735 | 1761 | (-> (if (pos? shutdown-timeout) |
1736 | 1762 | ;; 2. Wait for in-flight requests to stop processing within the supplied timeout |
1737 | 1763 | ;; interval. |
|
1759 | 1785 | (port [_] |
1760 | 1786 | (-> ch .localAddress .getPort)) |
1761 | 1787 | (wait-for-close [_] |
1762 | | - (-> ch .closeFuture .await) |
| 1788 | + (when (nil? existing-channel) |
| 1789 | + (-> ch .closeFuture .await)) |
1763 | 1790 | (-> group .terminationFuture .await) |
1764 | 1791 | nil))) |
1765 | 1792 |
|
|
0 commit comments