1+ package com .eatthepath .pushy .apns ;
2+
3+ import com .eatthepath .pushy .apns .proxy .ProxyHandlerFactory ;
4+ import io .netty .bootstrap .Bootstrap ;
5+ import io .netty .channel .*;
6+ import io .netty .channel .socket .SocketChannel ;
7+ import io .netty .handler .ssl .SslContext ;
8+ import io .netty .handler .ssl .SslHandler ;
9+ import io .netty .resolver .AddressResolverGroup ;
10+ import io .netty .resolver .NoopAddressResolverGroup ;
11+ import io .netty .util .AttributeKey ;
12+ import io .netty .util .ReferenceCounted ;
13+ import io .netty .util .concurrent .Future ;
14+ import io .netty .util .concurrent .Promise ;
15+ import io .netty .util .concurrent .PromiseNotifier ;
16+
17+ import javax .net .ssl .SSLEngine ;
18+ import javax .net .ssl .SSLParameters ;
19+ import java .io .Closeable ;
20+ import java .net .InetSocketAddress ;
21+ import java .net .SocketAddress ;
22+ import java .time .Duration ;
23+ import java .util .concurrent .TimeUnit ;
24+ import java .util .concurrent .atomic .AtomicBoolean ;
25+ import java .util .concurrent .atomic .AtomicLong ;
26+
27+ /**
28+ * An APNs channel factory creates new channels connected to an APNs server. Channels constructed by this factory are
29+ * intended for use in an {@link ApnsChannelPool}.
30+ */
31+ abstract class AbstractApnsChannelFactory implements PooledObjectFactory <Channel >, Closeable {
32+
33+ private final SslContext sslContext ;
34+ private final AtomicBoolean hasReleasedSslContext = new AtomicBoolean (false );
35+
36+ private final AddressResolverGroup <? extends SocketAddress > addressResolverGroup ;
37+
38+ private final Bootstrap bootstrapTemplate ;
39+
40+ private final AtomicLong currentDelaySeconds = new AtomicLong (0 );
41+
42+ private static final long MIN_CONNECT_DELAY_SECONDS = 1 ;
43+ private static final long MAX_CONNECT_DELAY_SECONDS = 60 ;
44+
45+ static final AttributeKey <Promise <Channel >> CHANNEL_READY_PROMISE_ATTRIBUTE_KEY =
46+ AttributeKey .valueOf (ApnsNotificationChannelFactory .class , "channelReadyPromise" );
47+
48+ AbstractApnsChannelFactory (final InetSocketAddress serverAddress ,
49+ final SslContext sslContext ,
50+ final ProxyHandlerFactory proxyHandlerFactory ,
51+ final boolean hostnameVerificationEnabled ,
52+ final Duration connectionTimeout ,
53+ final ApnsClientResources clientResources ) {
54+
55+ this .sslContext = sslContext ;
56+
57+ if (this .sslContext instanceof ReferenceCounted ) {
58+ ((ReferenceCounted ) this .sslContext ).retain ();
59+ }
60+
61+ this .addressResolverGroup = proxyHandlerFactory != null
62+ ? NoopAddressResolverGroup .INSTANCE
63+ : clientResources .getRoundRobinDnsAddressResolverGroup ();
64+
65+ this .bootstrapTemplate = new Bootstrap ();
66+ this .bootstrapTemplate .group (clientResources .getEventLoopGroup ());
67+ this .bootstrapTemplate .option (ChannelOption .TCP_NODELAY , true );
68+ this .bootstrapTemplate .remoteAddress (serverAddress );
69+ this .bootstrapTemplate .resolver (this .addressResolverGroup );
70+
71+ if (connectionTimeout != null ) {
72+ this .bootstrapTemplate .option (ChannelOption .CONNECT_TIMEOUT_MILLIS , (int ) connectionTimeout .toMillis ());
73+ }
74+
75+ this .bootstrapTemplate .handler (new ChannelInitializer <SocketChannel >() {
76+
77+ @ Override
78+ protected void initChannel (final SocketChannel channel ) {
79+ final String authority = serverAddress .getHostName ();
80+ final SslHandler sslHandler = sslContext .newHandler (channel .alloc (), authority , serverAddress .getPort ());
81+
82+ if (hostnameVerificationEnabled ) {
83+ final SSLEngine sslEngine = sslHandler .engine ();
84+ final SSLParameters sslParameters = sslEngine .getSSLParameters ();
85+ sslParameters .setEndpointIdentificationAlgorithm ("HTTPS" );
86+ sslEngine .setSSLParameters (sslParameters );
87+ }
88+
89+ constructPipeline (sslHandler , channel .pipeline ());
90+ }
91+ });
92+ }
93+
94+ protected abstract void constructPipeline (final SslHandler sslHandler , final ChannelPipeline pipeline );
95+
96+ /**
97+ * Creates and connects a new channel. The initial connection attempt may be delayed to accommodate exponential
98+ * back-off requirements.
99+ *
100+ * @param channelReadyPromise the promise to be notified when a channel has been created and connected to the APNs
101+ * server
102+ *
103+ * @return a future that will be notified once a channel has been created and connected to the APNs server
104+ */
105+ @ Override
106+ public Future <Channel > create (final Promise <Channel > channelReadyPromise ) {
107+ final long delay = this .currentDelaySeconds .get ();
108+
109+ channelReadyPromise .addListener (future -> {
110+ final long updatedDelay = future .isSuccess () ? 0 :
111+ Math .max (Math .min (delay * 2 , MAX_CONNECT_DELAY_SECONDS ), MIN_CONNECT_DELAY_SECONDS );
112+
113+ this .currentDelaySeconds .compareAndSet (delay , updatedDelay );
114+ });
115+
116+
117+ this .bootstrapTemplate .config ().group ().schedule (() -> {
118+ final Bootstrap bootstrap = this .bootstrapTemplate .clone ()
119+ .channelFactory (new AugmentingReflectiveChannelFactory <>(
120+ ClientChannelClassUtil .getSocketChannelClass (this .bootstrapTemplate .config ().group ()),
121+ CHANNEL_READY_PROMISE_ATTRIBUTE_KEY , channelReadyPromise ));
122+
123+ final ChannelFuture connectFuture = bootstrap .connect ();
124+
125+ connectFuture .addListener (future -> {
126+ if (!future .isSuccess ()) {
127+ channelReadyPromise .tryFailure (future .cause ());
128+ }
129+ });
130+ }, delay , TimeUnit .SECONDS );
131+
132+ return channelReadyPromise ;
133+ }
134+
135+ /**
136+ * Destroys a channel by closing it.
137+ *
138+ * @param channel the channel to destroy
139+ * @param promise the promise to notify when the channel has been destroyed
140+ *
141+ * @return a future that will be notified when the channel has been destroyed
142+ */
143+ @ Override
144+ public Future <Void > destroy (final Channel channel , final Promise <Void > promise ) {
145+ channel .close ().addListener (new PromiseNotifier <>(promise ));
146+ return promise ;
147+ }
148+
149+ @ Override
150+ public void close () {
151+ try {
152+ this .addressResolverGroup .close ();
153+ } finally {
154+ if (this .sslContext instanceof ReferenceCounted ) {
155+ if (this .hasReleasedSslContext .compareAndSet (false , true )) {
156+ ((ReferenceCounted ) this .sslContext ).release ();
157+ }
158+ }
159+ }
160+ }
161+ }
0 commit comments