diff --git a/akka-epl/.gitignore b/akka-epl/.gitignore new file mode 100644 index 0000000..bb5dd69 --- /dev/null +++ b/akka-epl/.gitignore @@ -0,0 +1,4 @@ +// you have checked in compiled class files by mistake +// it's good practice to not do that, easiest is to mark +// target as ignored +target/ diff --git a/akka-epl/project/project/target/config-classes/$731f18e1a71d9df9db04.cache b/akka-epl/project/project/target/config-classes/$731f18e1a71d9df9db04.cache deleted file mode 100644 index 0bc0f08..0000000 --- a/akka-epl/project/project/target/config-classes/$731f18e1a71d9df9db04.cache +++ /dev/null @@ -1 +0,0 @@ -sbt.internals.DslEntry \ No newline at end of file diff --git a/akka-epl/src/main/scala/com/epl/akka/AkkaHTTPClient.scala b/akka-epl/src/main/scala/com/epl/akka/AkkaHTTPClient.scala index 33eabc4..cdb9281 100644 --- a/akka-epl/src/main/scala/com/epl/akka/AkkaHTTPClient.scala +++ b/akka-epl/src/main/scala/com/epl/akka/AkkaHTTPClient.scala @@ -10,16 +10,27 @@ import com.epl.akka.AkkaHTTPClient.GET /** * Created by sanjeevghimire on 9/19/17. */ +// this actor seems somewhat redundant in general as all it does is passing requests to singleRequest +// and then folding the response into memory (none of the actual work happens in the thread of the actor) +// it is also not used anywhere, from what I can see, so maybe just completely remove it class AkkaHTTPClient() extends Actor with ActorLogging { import akka.pattern.pipe import context.dispatcher + // this materializer is bound to the lifecycle of the actor system, that means it will leak if + // this actor is stopped. Better either provide a single materializer to use across the app + // and take as a constructor parameter here, or to create the materializer with the actor context + // as actor-ref factory (then it will be bound to the actor) + // See docs here: https://doc.akka.io/docs/akka/current/stream/stream-flows-and-basics.html#actor-materializer-lifecycle final implicit val materializer: ActorMaterializer = ActorMaterializer(ActorMaterializerSettings(context.system)) val http = Http(context.system) + // the commented out logic around this is incorrect as a GET message immediately triggers work + // done in a future and the actor then continues processing the next message, meaning that when the + // response comes back the originalSender field would contain the latest sender, not the original sender //var originalSender: Option[ActorRef] = None // <-- added override def receive: Receive = { @@ -49,6 +60,7 @@ class AkkaHTTPClient() extends Actor object AkkaHTTPClient { + // use the lambda based props factory Props(new MyActor) instead of the reflection based one def props() = Props[AkkaHTTPClient] diff --git a/akka-epl/src/main/scala/com/epl/akka/CloudantReaderWriter.scala b/akka-epl/src/main/scala/com/epl/akka/CloudantReaderWriter.scala index eb6904d..85b6e74 100644 --- a/akka-epl/src/main/scala/com/epl/akka/CloudantReaderWriter.scala +++ b/akka-epl/src/main/scala/com/epl/akka/CloudantReaderWriter.scala @@ -18,14 +18,25 @@ object CloudantReaderWriter { /** * Created by sanjeevghimire on 9/19/17. */ +// Each of the messages this actor accepts just trigger async work somewhere else, and there is +// no state this actor is superflous and could easily be done with regular methods returning Futures +// If you want to make it bring something to the table that could be for example keeping a cache of +// the documents so that each request does not require a further HTTP request to cloudant, and then +// have a periodic (with Timers) invalidation and or refresh of that cache. class CloudantReaderWriter extends Actor with ActorLogging{ implicit val ec = context.dispatcher private val config = context.system.settings.config + override def receive: Receive = { case SaveToCloudantDatabase(jsonString: String) => + // in general side effecting from a Future callback like this is bad practice + // since it will execute on a thread that is not the actor, using `log` should be thread safe + // but it is not good to show in example code as it somewhat encourages doing the wrong thing(tm) + // if you want to react on the future completion in the actor you should use `pipeTo` + // see docs here: https://doc.akka.io/docs/akka/current/actors.html#ask-send-and-receive-future WebHttpClient.post(config.getString("cloudant.post_url"),jsonString,config.getString("cloudant.username"),config.getString("cloudant.password")) onComplete { case Success(body) => log.info("Successfully Saved:: "+ body) @@ -36,11 +47,18 @@ class CloudantReaderWriter extends Actor with ActorLogging{ case GetDocument(documentType) => val url: String = getUrl(documentType) val res = WebHttpClient.getWithHeader(url,config.getString("cloudant.username"),config.getString("cloudant.password")) + // using Strings/generic types as messages is an antipattern, you should define + // a response message for GetDocument and send that back. (And having the data as string is also + // questionable, as mentioned elsewhere) res pipeTo sender() case ExpireCurrentDocument() => val url: String = config.getString("cloudant.get_unexpireddoc_url") + // as you just want to react on success ful query, a flatMap(unexpired => further-request) + // would be a more ideomatic way to use futures here than side-effecting in onComplete + // if this actor represents the connection to cloudant it may also make sense to make sure + // failures to talk to cloudant end up crashing the actor WebHttpClient.getWithHeader(url,config.getString("cloudant.username"),config.getString("cloudant.password")) onComplete { case Success(unexpiredDocumentJson) => val jsValue: JsValue = Json.parse(unexpiredDocumentJson) diff --git a/akka-epl/src/main/scala/com/epl/akka/CrawlingApp.scala b/akka-epl/src/main/scala/com/epl/akka/CrawlingApp.scala index 11341a1..3a07d79 100644 --- a/akka-epl/src/main/scala/com/epl/akka/CrawlingApp.scala +++ b/akka-epl/src/main/scala/com/epl/akka/CrawlingApp.scala @@ -5,7 +5,14 @@ import com.epl.akka.WebCrawler.CrawlRequest object CrawlingApp extends App { val system = ActorSystem("Crawler") + // don't ad-hoc create the Props here, define a props method in the companion object instead, + // and avoid using the reflection based props factory method val webCrawler = system.actorOf(Props[WebCrawler], "WebCrawler") // start the crawling webCrawler ! CrawlRequest("https://www.premierleague.com/", true) + + // this app does a single "crawl", but it never completes, the application + // will live forever. That's not what I'd expect from such an app. + // It should somehow track if it is done or not and then terminate the actor system + // so that the JVM can shut down. } diff --git a/akka-epl/src/main/scala/com/epl/akka/DBOperation.scala b/akka-epl/src/main/scala/com/epl/akka/DBOperation.scala index 3c0190f..bfd13c8 100644 --- a/akka-epl/src/main/scala/com/epl/akka/DBOperation.scala +++ b/akka-epl/src/main/scala/com/epl/akka/DBOperation.scala @@ -10,7 +10,7 @@ import play.api.libs.json.{JsValue, Json} import scala.util.{Failure, Success} - +// this is never used so better remove it object DBOperation{ case class SaveToCloudantDatabase(jsValue: JsValue){} diff --git a/akka-epl/src/main/scala/com/epl/akka/DocumentType.scala b/akka-epl/src/main/scala/com/epl/akka/DocumentType.scala index eef2c62..bf19617 100644 --- a/akka-epl/src/main/scala/com/epl/akka/DocumentType.scala +++ b/akka-epl/src/main/scala/com/epl/akka/DocumentType.scala @@ -3,6 +3,11 @@ package com.epl.akka /** * Created by sanjeevghimire on 9/15/17. */ +// Enumeration is not often used in Scala code (because it is a pretty clumsy tool) +// a much more common/useful way to do enumerations is to use ADTs, like so: +// sealed trait DocumentType +// case object TeamTableDocument extends DocumentType +// case object FixturesDocument extends DocumentType object DocumentType extends Enumeration{ type Documenttype = Value val TeamTable,Fixtures,Results = Value diff --git a/akka-epl/src/main/scala/com/epl/akka/HTMLParser.scala b/akka-epl/src/main/scala/com/epl/akka/HTMLParser.scala index a4d68a0..803467a 100644 --- a/akka-epl/src/main/scala/com/epl/akka/HTMLParser.scala +++ b/akka-epl/src/main/scala/com/epl/akka/HTMLParser.scala @@ -23,6 +23,8 @@ object HTMLParser { /** * Created by sanjeevghimire on 9/1/17. */ +// this name does not really encode what the actor does given that it fetches pages, parses content +// and turns the result to JSON (the last is not a very good design, see comments below) class HTMLParser() extends Actor with ActorLogging{ val EXPIRED_NO = "NO" @@ -32,6 +34,9 @@ class HTMLParser() extends Actor with ActorLogging{ import scala.collection.JavaConverters._ + // in general it is good practice to move methods that require no context from the actor + // to the companion object (making them static) as that makes it clear that they are pure + // and also makes writing unit tests for them easier. def getValidLinks(content: String, url:String): Iterator[String] = { Jsoup.parse(content, url).select("a[href]").iterator().asScala.map(_.absUrl("href")) } @@ -125,6 +130,13 @@ class HTMLParser() extends Actor with ActorLogging{ val jsValue: JsValue = Json.toJson(resultsMap) val returnJson: JsObject = jsValue.as[JsObject] + ("expired" -> Json.toJson(EXPIRED_NO)) + ("documentType" -> Json.toJson(DocumentType.Results.toString)) + + // using strings (actually even the JSValue types) for internal communication inside + // of the app is in general bad practice, sometimes referred to as "stringly typed" programming, + // and is not good to use as an example for newcomers. + // A proper design would parse data into a specific type here, and then turn that + // into json encoded as a string in the CloudAntReaderWriter which knows what format it + // needs to save things in. Json.stringify(returnJson) } @@ -140,6 +152,10 @@ class HTMLParser() extends Actor with ActorLogging{ //.filter(link => link.endsWith("fixtures") || link.endsWith("tables") || link.endsWith("results")) .filter(link => link.endsWith("fixtures")) .foreach(context.parent ! CrawlRequest(_,false)) + + // rethrow instead and put the decision to stop in the supervision of the parent + // which would make it possible to recover from failure by starting a new actor etc. + // this will instead permanently break the app until the entire app is restarted case Failure(err) => log.error(err, "Error while crawling url: "+ url) stop() @@ -150,6 +166,10 @@ class HTMLParser() extends Actor with ActorLogging{ val isFixtures = url.endsWith("fixtures") val isTables = url.endsWith("tables") val isResults = url.endsWith("results") + // this is a blocking call, which means it should be run on a separate threadpool + // or it may starve the actor system meaning that other actors do not get to execute + // read this section of the docs: + // https://doc.akka.io/docs/akka/current/dispatchers.html#problem-blocking-on-default-dispatcher val body: String = WebHttpClient.getUsingPhantom(url) var jsonString: String = null if(isFixtures) { @@ -160,6 +180,14 @@ class HTMLParser() extends Actor with ActorLogging{ jsonString = getResultsAndConvertToJson(body,url) } log.info("HTML to JSON: "+jsonString) + + // this mixes conserns, why does the HTMLParser actor know that something should be saved to cloudant + // it should instead be a result of the requested work, it should instead be a response to the command + // and it should be sent to the sender(), using the parent makes it much harder than necessary to test + // case CrawlAndPrepare => + // ... + // sender() ! PreparedResult(data) + context.parent ! SaveJsonToCloudant(jsonString) case _: Status.Failure => stop() diff --git a/akka-epl/src/main/scala/com/epl/akka/SoccerMainController.scala b/akka-epl/src/main/scala/com/epl/akka/SoccerMainController.scala index c1fcd22..5acac22 100644 --- a/akka-epl/src/main/scala/com/epl/akka/SoccerMainController.scala +++ b/akka-epl/src/main/scala/com/epl/akka/SoccerMainController.scala @@ -26,6 +26,7 @@ object SoccerMainController extends App with CorsSupport{ val config = system.settings.config + // don't create Props inline, use a factory on the companion val cloudantReader = system.actorOf(Props[CloudantReaderWriter], "cloudantWriter") @@ -34,6 +35,13 @@ object SoccerMainController extends App with CorsSupport{ val route: Route = path("fixtures") { get { + // for a path producing the best practice way would be to have the + // reader actor return a typed object here, and then use the marshalling infrastructure + // to turn the json into bytes. This conflates the way you store data with the way you + // represent data to your client (even if at first they are the same) + // see docs here: https://doc.akka.io/docs/akka-http/current/common/json-support.html#json-support + // Doing this stringly typed may be fine for a home project app, but it is not good for a technology guide + // which inexperienced developers will use as a blueprint val fixtureJson:Future[String] = (cloudantReader ? GetDocument(DocumentType.Fixtures)).mapTo[String] complete( fixtureJson @@ -61,6 +69,7 @@ object SoccerMainController extends App with CorsSupport{ config.getString("http.host"), config.getInt("http.port")) + // isn't this sample for running on the IBM cloud, who is the user that will press return to stop there? println( s"Server online at ${config.getString("http.host")}: ${config.getString("http.port")}\nPress RETURN to stop...") StdIn.readLine() // let it run until user presses return diff --git a/akka-epl/src/main/scala/com/epl/akka/TeamTable.scala b/akka-epl/src/main/scala/com/epl/akka/TeamTable.scala index 047e137..3699d2b 100644 --- a/akka-epl/src/main/scala/com/epl/akka/TeamTable.scala +++ b/akka-epl/src/main/scala/com/epl/akka/TeamTable.scala @@ -5,6 +5,8 @@ import play.api.libs.json.Json /** * Created by sanjeevghimire on 9/6/17. */ +// this looks like it was started on to do the right thing (tm) with actual types +// inside of the system case class TeamTable(position: Int, teamName: String, teamShortName: String, @@ -21,6 +23,14 @@ case class TeamTable(position: Int, object TeamTable { + // This is the domain model of the data in the system - + // The JSON format would normally belong closer to the places that interfaces with the outer world - + // one for where you ingest - what data you are served (and which of it you want to keep) + // one for the database - how the data is stored (could also be augmented with application specific data, for example + // for example a timestamp when it was written, who ran it) + // and one for the HTTP endpoint - how/what data is served to the outside (not necessarily all the data you have) + // ofc those formats could be the same, but it is not a responsibility of the model itself how it is serialized + // in different places implicit val jsonFormat = Json.format[TeamTable] def of(teamName:String,item: List[Any]) ={ diff --git a/akka-epl/src/main/scala/com/epl/akka/URLValidator.scala b/akka-epl/src/main/scala/com/epl/akka/URLValidator.scala index 697b283..14ec9e7 100644 --- a/akka-epl/src/main/scala/com/epl/akka/URLValidator.scala +++ b/akka-epl/src/main/scala/com/epl/akka/URLValidator.scala @@ -14,6 +14,10 @@ object URLValidator { /** * Created by sanjeevghimire on 9/1/17. */ + +// Making this an actor doesn't really make sense, it introduces an async boundary from the parent +// actor but there is no gain (neither more resilient nor more performant) than just keeping this state +// directly in WebCrawler class URLValidator extends Actor with ActorLogging{ var visitedUrl = Set.empty[String] var childUrls = Set.empty[ActorRef] diff --git a/akka-epl/src/main/scala/com/epl/akka/WebCrawler.scala b/akka-epl/src/main/scala/com/epl/akka/WebCrawler.scala index e2bc72a..482387e 100644 --- a/akka-epl/src/main/scala/com/epl/akka/WebCrawler.scala +++ b/akka-epl/src/main/scala/com/epl/akka/WebCrawler.scala @@ -9,6 +9,7 @@ import com.epl.akka.WebCrawler._ object WebCrawler { case class CrawlRequest(url: String, isRootUrl: Boolean) {} + // Not used case class CrawlResponse(links: Set[String]) {} case class Crawl(url: String,isRootUrl: Boolean){} case class SaveJsonToCloudant(jsonString: String){} @@ -21,6 +22,7 @@ object WebCrawler { */ class WebCrawler extends Actor with ActorLogging{ + // create a props factory for the actor companions instead of an adhoc Props call here val urlValidator = context.actorOf(Props[URLValidator](new URLValidator())) val htmlParser = context.actorOf(Props[HTMLParser](new HTMLParser())) val cloudantWriter = context.actorOf(Props[CloudantReaderWriter](new CloudantReaderWriter())) diff --git a/akka-epl/src/main/scala/com/epl/akka/WebHttpClient.scala b/akka-epl/src/main/scala/com/epl/akka/WebHttpClient.scala index 772e554..65c7121 100644 --- a/akka-epl/src/main/scala/com/epl/akka/WebHttpClient.scala +++ b/akka-epl/src/main/scala/com/epl/akka/WebHttpClient.scala @@ -27,11 +27,21 @@ import scala.concurrent.{ExecutionContext, Future, Promise} * In this case this returns the html output of the url * being crawled */ +// this object mixes concerns in that it is both a facade for the AsyncHttpClient and +// PhantomJS at the same time, two client technologies that doesn't really have anything in common +// separate those to two different places object WebHttpClient { + // in general not a good idea to put state like this in a singleton, could be ok here, but what happens + // if the async http client crashes - no way to restart it as it is effectively a constant per JVM + // (that would be a great use case for using an actor) val PHANTOMJS_PAGE_CUSTOMHEADERS_PREFIX: String = "phantomjs.page.customHeaders." val PHANTOMJS_CLI_ARGS: String = "phantomjs.cli.args" + // a bit of shame that you can't just use the Akka HTTP client here given that you + // already have Akka HTTP as a dependency, was there some showstopper for that? + // if so it would make sense to add a comment here saying "We need X which isn't currently supported + // by Akka HTTP so therefore we use AsyncHttpClient instead" val config = new AsyncHttpClientConfig.Builder() val client = new AsyncHttpClient(config .setFollowRedirect(true) @@ -40,11 +50,15 @@ object WebHttpClient { val USER_AGENT: String= "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_11_5) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/52.0.2743.116 Safari/537.36" System.setProperty("phantomjs.page.settings.userAgent", USER_AGENT) + + // if this was instead var driver: WebDriver = null import scala.concurrent.duration._ + // here you create a separate actor system and materializer that starts thread pools/uses resources but + // is not really used implicit val system: ActorSystem = ActorSystem() implicit val materializer: Materializer = ActorMaterializer() @@ -90,7 +104,7 @@ object WebHttpClient { promise.future } - +// remove stuff rather than keep them commented out in a published tutorial sample // def getUsingAkkaHttp(url: String)(implicit system: ActorSystem, mat: Materializer): Future[String] = { // implicit val executionContext = system.dispatcher // @@ -152,6 +166,8 @@ object WebHttpClient { + // another great reason to put this inside an actor, actors have explicit lifecycle, + // and concurrency is handled for you. What happens here if multiple threads calls this at the same time? def initPhantomJS(){ val desiredCaps: DesiredCapabilities = DesiredCapabilities.phantomjs() desiredCaps.setJavascriptEnabled(true) @@ -165,7 +181,8 @@ object WebHttpClient { driver = new PhantomJSDriver(desiredCaps); } - + // and here as well, this is not thread safe, this will not work given that you call it from + // future callbacks in the rest of hte code def getUsingPhantom(url: String): String = { initPhantomJS() driver.get(url)