Skip to content

Commit 25c0f6f

Browse files
authored
Execute email sending in its own zone. (#8047)
1 parent e662907 commit 25c0f6f

File tree

1 file changed

+53
-19
lines changed

1 file changed

+53
-19
lines changed

app/lib/frontend/email_sender.dart

Lines changed: 53 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import 'package:http/http.dart' as http;
1313
import 'package:logging/logging.dart';
1414
import 'package:mailer/mailer.dart';
1515
import 'package:mailer/smtp_server.dart';
16+
import 'package:pool/pool.dart';
1617
import 'package:retry/retry.dart';
1718

1819
import '../shared/email.dart';
@@ -129,20 +130,29 @@ class _GmailSmtpRelay implements EmailSender {
129130
final _connectionsBySender = <String, Future<_GmailConnection>>{};
130131
final _forceReconnectSenders = <String>{};
131132

133+
/// The current zone when this [_GmailSmtpRelay] instance was created.
134+
final Zone _parentZone;
135+
final _pool = Pool(1);
136+
132137
DateTime _accessTokenRefreshed = DateTime(0);
133138
DateTime _backoffUntil = DateTime(0);
134139
Future<String>? _accessToken;
135140

136141
_GmailSmtpRelay(
137142
this._serviceAccountEmail,
138143
this._authClient,
139-
);
144+
) : _parentZone = Zone.current;
140145

141146
@override
142147
bool get shouldBackoff => clock.now().isBefore(_backoffUntil);
143148

144149
@override
145150
Future<void> sendMessage(EmailMessage message) async {
151+
// One attempt at a time.
152+
await _pool.withResource(() async => _sendMessage(message));
153+
}
154+
155+
Future<void> _sendMessage(EmailMessage message) async {
146156
final debugHeader = 'Message-ID:${message.localMessageId}@pub.dev '
147157
'(${message.subject}) '
148158
'from ${message.from} '
@@ -153,11 +163,7 @@ class _GmailSmtpRelay implements EmailSender {
153163
await retry(
154164
() async {
155165
final c = await _getConnection(sender);
156-
try {
157-
await c.connection.send(_toMessage(message));
158-
} finally {
159-
c.trackSentEmail(message.recipients.length);
160-
}
166+
await c.send(_toMessage(message));
161167
},
162168
retryIf: (e) =>
163169
e is TimeoutException ||
@@ -193,14 +199,11 @@ class _GmailSmtpRelay implements EmailSender {
193199
return old;
194200
}
195201
final newConnectionFuture = Future.microtask(() async {
196-
if (old != null) {
197-
try {
198-
await old.connection.close();
199-
} catch (e, st) {
200-
_logger.warning('Unable to close SMTP connection.', e, st);
201-
}
202-
}
202+
// closing the old connection if there was any, ignoring errors
203+
await old?.close();
204+
203205
return _GmailConnection(
206+
_parentZone,
204207
PersistentConnection(
205208
await _getSmtpServer(sender),
206209
timeout: Duration(seconds: 15),
@@ -280,20 +283,28 @@ class _GmailSmtpRelay implements EmailSender {
280283

281284
class _GmailConnection {
282285
final DateTime created;
283-
final PersistentConnection connection;
286+
final PersistentConnection _connection;
287+
final Zone _parentZone;
284288
DateTime _lastUsed;
285289
var _sentCount = 0;
290+
Object? _uncaughtError;
286291

287-
_GmailConnection(this.connection)
292+
_GmailConnection(this._parentZone, this._connection)
288293
: created = clock.now(),
289294
_lastUsed = clock.now();
290295

291-
void trackSentEmail(int count) {
292-
_sentCount += count;
293-
_lastUsed = clock.now();
294-
}
296+
late final _zone = _parentZone.fork(specification: ZoneSpecification(
297+
handleUncaughtError: (self, parent, zone, error, stackTrace) {
298+
_uncaughtError = error;
299+
_logger.severe('Uncaught error while sending email', error, stackTrace);
300+
},
301+
));
295302

296303
bool get isExpired {
304+
// The connection is in an unknown state, better not use it.
305+
if (_uncaughtError != null) {
306+
return true;
307+
}
297308
// There is a 100-recipient limit per SMTP transaction for smtp-relay.gmail.com.
298309
// Exceeding this limit results in an error message. To send messages to
299310
// additional recipients, start another transaction (new SMTP connection or RSET command).
@@ -310,4 +321,27 @@ class _GmailConnection {
310321
}
311322
return false;
312323
}
324+
325+
Future<SendReport> send(Message message) async {
326+
_sentCount += message.recipients.length + message.ccRecipients.length;
327+
try {
328+
final r = await _zone.run(() async => await _connection.send(message));
329+
if (_uncaughtError != null) {
330+
throw EmailSenderException.failed();
331+
}
332+
return r;
333+
} finally {
334+
_lastUsed = clock.now();
335+
}
336+
}
337+
338+
Future<void> close() async {
339+
try {
340+
await _zone.run(() async {
341+
await _connection.close();
342+
});
343+
} catch (e, st) {
344+
_logger.warning('Unable to close SMTP connection.', e, st);
345+
}
346+
}
313347
}

0 commit comments

Comments
 (0)