Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ version = "0.1.0"
authors = ["Julius de Bruijn <julius@nauk.io>"]
license = "Apache-2.0"
readme = "README.md"
edition = "2018"
description = "A consumer to send push notifications from Kafka"
keywords = ["apns", "fcm", "web-push", "consumer", "kafka"]
repository = "https://github.com/xray-tech/xorc-notifications"
Expand Down
2 changes: 1 addition & 1 deletion build.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
extern crate protoc_rust;
use protoc_rust;

fn main() {
protoc_rust::run(protoc_rust::Args {
Expand Down
7 changes: 0 additions & 7 deletions examples/send_http_request.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,3 @@
extern crate common;
extern crate clap;
extern crate rdkafka;
extern crate chrono;
extern crate protobuf;
extern crate futures;

use rdkafka::{
config::ClientConfig,
producer::future_producer::{
Expand Down
8 changes: 4 additions & 4 deletions src/apns2/consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ use common::{

use a2::{client::Endpoint, error::Error};

use notifier::Notifier;
use producer::ApnsProducer;
use crate::notifier::Notifier;
use crate::producer::ApnsProducer;

pub struct ApnsHandler {
producer: ApnsProducer,
Expand Down Expand Up @@ -110,7 +110,7 @@ impl EventHandler for ApnsHandler {
&self,
key: Option<Vec<u8>>,
event: PushNotification,
) -> Box<Future<Item = (), Error = ()> + 'static + Send> {
) -> Box<dyn Future<Item = (), Error = ()> + 'static + Send> {
let producer = self.producer.clone();
let timer = RESPONSE_TIMES_HISTOGRAM.start_timer();

Expand Down Expand Up @@ -145,7 +145,7 @@ impl EventHandler for ApnsHandler {
&self,
_: Option<Vec<u8>>,
_: HttpRequest,
) -> Box<Future<Item=(), Error=()> + 'static + Send> {
) -> Box<dyn Future<Item=(), Error=()> + 'static + Send> {
warn!("We don't handle http request events here");
Box::new(ok(()))
}
Expand Down
9 changes: 1 addition & 8 deletions src/apns2/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,11 @@
#[macro_use] extern crate slog;
#[macro_use] extern crate slog_scope;

extern crate a2;
extern crate common;
extern crate futures;
extern crate heck;
extern crate serde_json;
extern crate tokio_timer;

mod consumer;
mod notifier;
mod producer;

use consumer::ApnsHandler;
use crate::consumer::ApnsHandler;
use std::env;

use common::{config::Config, system::System};
Expand Down
2 changes: 1 addition & 1 deletion src/apns2/producer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use common::{
};

use heck::SnakeCase;
use CONFIG;
use crate::CONFIG;

pub struct ApnsProducer {
producer: ResponseProducer,
Expand Down
2 changes: 1 addition & 1 deletion src/common/config.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use kafka;
use crate::kafka;
use toml;

use std::{fs::File, io::prelude::*};
Expand Down
File renamed without changes.
File renamed without changes.
20 changes: 10 additions & 10 deletions src/common/kafka/request_consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ use rdkafka::{
consumer::{CommitMode, Consumer, stream_consumer::StreamConsumer},
topic_partition_list::{Offset, TopicPartitionList},
};
use kafka::Config;
use events::{
use crate::kafka::Config;
use crate::events::{
application::Application,
push_notification::PushNotification,
http_request::HttpRequest,
Expand Down Expand Up @@ -34,15 +34,15 @@ pub trait EventHandler {
&self,
key: Option<Vec<u8>>,
event: PushNotification,
) -> Box<Future<Item = (), Error = ()> + 'static + Send>;
) -> Box<dyn Future<Item = (), Error = ()> + 'static + Send>;

/// Try to send a http request. If key parameter is set, the response
/// will be sent with the same routing key.
fn handle_http(
&self,
key: Option<Vec<u8>>,
event: HttpRequest,
) -> Box<Future<Item = (), Error = ()> + 'static + Send>;
) -> Box<dyn Future<Item = (), Error = ()> + 'static + Send>;

/// Handle tenant configuration for connection setup.
fn handle_config(
Expand Down Expand Up @@ -98,7 +98,7 @@ impl<H: EventHandler + Send + Sync + 'static> RequestConsumer<H> {

info!("Starting config processing");

self.handler(consumer, control, &|msg: BorrowedMessage| {
self.handler(consumer, control, &|msg: BorrowedMessage<'_>| {
let convert_key = msg.key().and_then(|key| {
String::from_utf8(key.to_vec()).ok()
});
Expand Down Expand Up @@ -162,7 +162,7 @@ impl<H: EventHandler + Send + Sync + 'static> RequestConsumer<H> {

info!("Starting events processing");

self.handler(consumer, control, &|msg: BorrowedMessage| {
self.handler(consumer, control, &|msg: BorrowedMessage<'_>| {
debug!(
"Got message";
"topic" => msg.topic(),
Expand Down Expand Up @@ -197,7 +197,7 @@ impl<H: EventHandler + Send + Sync + 'static> RequestConsumer<H> {
&self,
consumer: StreamConsumer,
control: oneshot::Receiver<()>,
process_event: &Fn(BorrowedMessage) -> Result<(), ()>
process_event: &dyn Fn(BorrowedMessage<'_>) -> Result<(), ()>
) -> Result<(), ()> {
let mut core = Runtime::new().unwrap();

Expand All @@ -219,7 +219,7 @@ impl<H: EventHandler + Send + Sync + 'static> RequestConsumer<H> {
Ok(())
}

fn handle_push(&self, msg: &BorrowedMessage) {
fn handle_push(&self, msg: &BorrowedMessage<'_>) {
let event_parsing = msg.payload()
.and_then(|payload| parse_from_bytes::<PushNotification>(payload).ok());

Expand All @@ -242,7 +242,7 @@ impl<H: EventHandler + Send + Sync + 'static> RequestConsumer<H> {
}
}

fn handle_http(&self, msg: &BorrowedMessage) {
fn handle_http(&self, msg: &BorrowedMessage<'_>) {
let event_parsing = msg.payload()
.and_then(|payload| parse_from_bytes::<HttpRequest>(payload).ok());

Expand All @@ -261,7 +261,7 @@ impl<H: EventHandler + Send + Sync + 'static> RequestConsumer<H> {
}
}

fn handle_config(&self, msg_id: &str, msg: Option<&BorrowedMessage>) {
fn handle_config(&self, msg_id: &str, msg: Option<&BorrowedMessage<'_>>) {
let event = msg
.and_then(|msg| msg.payload())
.and_then(|payload| parse_from_bytes::<Application>(&payload).ok());
Expand Down
4 changes: 2 additions & 2 deletions src/common/kafka/response_producer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use rdkafka::{
},
};

use kafka::Config;
use crate::kafka::Config;
use protobuf::Message;
use std::sync::Arc;

Expand Down Expand Up @@ -42,7 +42,7 @@ impl ResponseProducer {
pub fn publish(
&self,
key: Option<Vec<u8>>,
event: &Message,
event: &dyn Message,
) -> DeliveryFuture {
let payload = event.write_to_bytes().unwrap();

Expand Down
19 changes: 0 additions & 19 deletions src/common/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,25 +5,6 @@
#[macro_use] extern crate slog;
#[macro_use] extern crate slog_scope;

extern crate a2;
extern crate argparse;
extern crate chan_signal;
extern crate chrono;
extern crate erased_serde;
extern crate futures;
extern crate http;
extern crate hyper;
extern crate protobuf;
extern crate rdkafka;
extern crate serde;
extern crate tokio;
extern crate toml;
extern crate web_push;
extern crate slog_json;
extern crate slog_async;
extern crate slog_term;
extern crate regex;

pub mod config;
pub mod events;
pub mod kafka;
Expand Down
10 changes: 5 additions & 5 deletions src/common/logger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use slog_term::{TermDecorator, CompactFormat};
use slog_async::Async;
use slog_json::Json;

use events::{
use crate::events::{
push_notification::PushNotification,
http_response::HttpResponse,
http_request::HttpRequest,
Expand Down Expand Up @@ -47,7 +47,7 @@ impl Logger {
}

impl KV for PushNotification {
fn serialize(&self, _record: &Record, serializer: &mut Serializer) -> slog::Result {
fn serialize(&self, _record: &Record<'_>, serializer: &mut dyn Serializer) -> slog::Result {
serializer.emit_str("device_token", self.get_device_token())?;
serializer.emit_str("universe", self.get_universe())?;
serializer.emit_str("correlation_id", self.get_header().get_correlation_id())?;
Expand All @@ -57,7 +57,7 @@ impl KV for PushNotification {
}

impl slog::Value for HttpResponse {
fn serialize(&self, _record: &Record, _key: Key, serializer: &mut Serializer) -> slog::Result {
fn serialize(&self, _record: &Record<'_>, _key: Key, serializer: &mut dyn Serializer) -> slog::Result {
if self.has_payload() {
serializer.emit_str(
"status_code",
Expand All @@ -73,7 +73,7 @@ impl slog::Value for HttpResponse {
}

impl slog::Value for HttpRequest {
fn serialize(&self, _record: &Record, _key: Key, serializer: &mut Serializer) -> slog::Result {
fn serialize(&self, _record: &Record<'_>, _key: Key, serializer: &mut dyn Serializer) -> slog::Result {
serializer.emit_str("correlation_id", self.get_header().get_correlation_id())?;
serializer.emit_str("request_type", self.get_request_type().as_ref())?;
serializer.emit_str("request_body", self.get_body())?;
Expand Down Expand Up @@ -114,7 +114,7 @@ impl slog::Value for HttpRequest {
}

impl KV for Application {
fn serialize(&self, _record: &Record, serializer: &mut Serializer) -> slog::Result {
fn serialize(&self, _record: &Record<'_>, serializer: &mut dyn Serializer) -> slog::Result {
serializer.emit_str("app_id", self.get_id())?;

if self.has_organization() {
Expand Down
10 changes: 5 additions & 5 deletions src/common/system.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
use chan_signal::{notify, Signal};
use config::Config;
use kafka::EventHandler;
use kafka::RequestConsumer;
use metrics::StatisticsServer;
use crate::config::Config;
use crate::kafka::EventHandler;
use crate::kafka::RequestConsumer;
use crate::metrics::StatisticsServer;
use std::{thread, thread::JoinHandle, sync::Arc};
use futures::sync::oneshot;
use logger::Logger;
use crate::logger::Logger;
use slog_scope;

pub struct System;
Expand Down
8 changes: 4 additions & 4 deletions src/fcm/consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ use common::{
use futures::{Future, future::ok};

use std::sync::RwLock;
use notifier::Notifier;
use producer::FcmProducer;
use crate::notifier::Notifier;
use crate::producer::FcmProducer;

pub struct FcmHandler {
producer: FcmProducer,
Expand Down Expand Up @@ -56,7 +56,7 @@ impl EventHandler for FcmHandler {
&self,
key: Option<Vec<u8>>,
event: PushNotification,
) -> Box<Future<Item = (), Error = ()> + 'static + Send> {
) -> Box<dyn Future<Item = (), Error = ()> + 'static + Send> {
let timer = RESPONSE_TIMES_HISTOGRAM.start_timer();
CALLBACKS_INFLIGHT.inc();

Expand Down Expand Up @@ -86,7 +86,7 @@ impl EventHandler for FcmHandler {
&self,
_: Option<Vec<u8>>,
_: HttpRequest
) -> Box<Future<Item=(), Error=()> + 'static + Send> {
) -> Box<dyn Future<Item=(), Error=()> + 'static + Send> {
warn!("We don't handle http request events here");
Box::new(ok(()))
}
Expand Down
7 changes: 1 addition & 6 deletions src/fcm/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,12 @@
#[macro_use] extern crate slog;
#[macro_use] extern crate slog_scope;

extern crate common;
extern crate fcm;
extern crate futures;

mod consumer;
mod notifier;
mod producer;

use common::{config::Config, system::System};

use consumer::FcmHandler;
use crate::consumer::FcmHandler;
use std::env;

lazy_static! {
Expand Down
2 changes: 1 addition & 1 deletion src/fcm/producer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use common::{
};

use fcm::response::{FcmError, FcmResponse, ErrorReason::*};
use CONFIG;
use crate::CONFIG;

pub struct FcmProducer {
producer: ResponseProducer,
Expand Down
8 changes: 4 additions & 4 deletions src/http_requester/consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ use common::{
};

use futures::{Future, future::ok};
use requester::Requester;
use producer::HttpResponseProducer;
use crate::requester::Requester;
use crate::producer::HttpResponseProducer;

pub struct HttpRequestHandler {
producer: HttpResponseProducer,
Expand All @@ -36,7 +36,7 @@ impl EventHandler for HttpRequestHandler {
&self,
_: Option<Vec<u8>>,
_: PushNotification,
) -> Box<Future<Item = (), Error = ()> + 'static + Send> {
) -> Box<dyn Future<Item = (), Error = ()> + 'static + Send> {
warn!("We don't handle push notification events here");
Box::new(ok(()))
}
Expand All @@ -45,7 +45,7 @@ impl EventHandler for HttpRequestHandler {
&self,
key: Option<Vec<u8>>,
event: HttpRequest,
) -> Box<Future<Item = (), Error = ()> + 'static + Send> {
) -> Box<dyn Future<Item = (), Error = ()> + 'static + Send> {
let producer = self.producer.clone();

let timer = RESPONSE_TIMES_HISTOGRAM.start_timer();
Expand Down
14 changes: 1 addition & 13 deletions src/http_requester/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,24 +2,12 @@
#[macro_use] extern crate slog;
#[macro_use] extern crate slog_scope;

extern crate tokio_timer;
extern crate protobuf;
extern crate common;
extern crate fcm;
extern crate futures;
extern crate hyper;
extern crate hyper_tls;
extern crate http;
extern crate bytes;
extern crate chrono;

mod consumer;
mod requester;
mod producer;

use common::{config::Config, system::System};

use consumer::HttpRequestHandler;
use crate::consumer::HttpRequestHandler;
use std::env;

lazy_static! {
Expand Down
Loading