Skip to content

Commit 50dc5ba

Browse files
committed
Add JSON output for local ollama
1 parent 80c431b commit 50dc5ba

File tree

2 files changed

+97
-12
lines changed

2 files changed

+97
-12
lines changed

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -178,7 +178,7 @@ test [AlpakkaTrophySpec](src/test/scala/alpakka/tcp_to_websockets/AlpakkaTrophyS
178178

179179
## Analyse Wikipedia edits live stream ##
180180

181-
Find out whose Wikipedia articles were changed in (near) real time by tapping into
181+
Find out whose Wikipedia articles were changed in (near) real time by consuming
182182
the [Wikipedia Edits stream provided via SSE](https://wikitech.wikimedia.org/wiki/Event_Platform/EventStreams).
183183
The class [WikipediaEditsAnalyser](src/main/scala/alpakka/sse_to_elasticsearch/WikipediaEditsAnalyser.scala) implements
184184
the following workflow:

src/main/scala/alpakka/sse_to_elasticsearch/WikipediaEditsAnalyser.scala

Lines changed: 96 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,7 @@ object WikipediaEditsAnalyser extends App {
9292
}
9393

9494
// Set to false for now, because local Ollama is still experimental
95-
private val useLocalOllamaNER: Boolean = false
95+
private val useLocalOllamaNER: Boolean = true
9696

9797
// Switch off at runtime via UI to save costs
9898
private val isRemoteProcessingEnabled = new AtomicBoolean(true)
@@ -224,23 +224,84 @@ object WikipediaEditsAnalyser extends App {
224224
}
225225
}
226226

227+
// Case classes to represent the Wikipedia API response structure
228+
case class WikipediaPage(
229+
pageid: Option[Long],
230+
ns: Option[Int],
231+
title: Option[String],
232+
extract: Option[String]
233+
)
234+
235+
case class WikipediaQuery(
236+
pages: Map[String, WikipediaPage]
237+
)
238+
239+
case class WikipediaApiResponse(
240+
batchcomplete: Option[String],
241+
query: Option[WikipediaQuery]
242+
)
243+
244+
// Circe decoders for Wikipedia API response
245+
implicit val wikipediaPageDecoder: Decoder[WikipediaPage] =
246+
Decoder.forProduct4("pageid", "ns", "title", "extract")(WikipediaPage.apply)
247+
248+
implicit val wikipediaQueryDecoder: Decoder[WikipediaQuery] =
249+
Decoder.forProduct1("pages")(WikipediaQuery.apply)
250+
251+
implicit val wikipediaApiResponseDecoder: Decoder[WikipediaApiResponse] =
252+
Decoder.forProduct2("batchcomplete", "query")(WikipediaApiResponse.apply)
253+
227254
private def fetchContent(ctx: Ctx): Future[Ctx] = {
228255
logger.info(s"[${ctx.traceId}] About to read `extract` from Wikipedia entry with title: ${ctx.change.title}")
229256
val encodedTitle = URLEncoder.encode(ctx.change.title, "UTF-8")
230257

231258
val requestURL = s"https://en.wikipedia.org/w/api.php?format=json&action=query&prop=extracts&exlimit=max&explaintext&exintro&titles=$encodedTitle"
259+
232260
Http().singleRequest(HttpRequest(uri = requestURL))
233261
.flatMap(_.entity.toStrict(2.seconds))
234-
.map(_.data.utf8String.split("\"extract\":").reverse.head)
235-
.map(content => ctx.copy(content = content))
262+
.map(_.data.utf8String)
263+
.map { jsonString =>
264+
logger.debug(s"[${ctx.traceId}] Raw Wikipedia API response: $jsonString")
265+
parse(jsonString) match {
266+
case Right(json) =>
267+
json.as[WikipediaApiResponse] match {
268+
case Right(apiResponse) =>
269+
val extractOpt = for {
270+
query <- apiResponse.query
271+
// Get the first page from the pages map (there should only be one for a single title request)
272+
(_, page) <- query.pages.headOption
273+
extract <- page.extract
274+
} yield extract
275+
276+
extractOpt match {
277+
case Some(extract) =>
278+
logger.info(s"[${ctx.traceId}] Successfully extracted content: ${extract.take(100)}...")
279+
ctx.copy(content = extract)
280+
case None =>
281+
logger.warn(s"[${ctx.traceId}] No extract found for title: ${ctx.change.title}")
282+
ctx.copy(content = "")
283+
}
284+
285+
case Left(decodingError) =>
286+
logger.error(s"[${ctx.traceId}] Failed to decode Wikipedia API response: $decodingError")
287+
ctx.copy(content = "")
288+
}
289+
case Left(parsingError) =>
290+
logger.error(s"[${ctx.traceId}] Failed to parse Wikipedia API JSON: $parsingError")
291+
ctx.copy(content = "")
292+
}
293+
}
294+
.recover {
295+
case ex: Exception =>
296+
logger.error(s"[${ctx.traceId}] Error fetching content from Wikipedia API: ${ex.getMessage}", ex)
297+
ctx.copy(content = "")
298+
}
236299
}
237300

238301
private def findPersons(ctx: Ctx): Future[Ctx] = {
239302
val localNERFuture = if (useLocalOllamaNER) {
240-
logger.debug(s"[${ctx.traceId}] Using Local Ollama NER")
241303
findPersonsLocalOllamaNER(ctx)
242304
} else {
243-
logger.debug(s"[${ctx.traceId}] Using Local Java NLP NER")
244305
findPersonsLocalNER(ctx)
245306
}
246307
localNERFuture.flatMap(localResult => findPersonsRemoteNER(localResult))
@@ -407,35 +468,59 @@ object WikipediaEditsAnalyser extends App {
407468
.baseUrl(ollamaContainer.getBaseUrl)
408469
.modelName("llama3.2:1b")
409470
.temperature(0)
471+
.topP(0.1)
410472
.responseFormat(ResponseFormat.JSON)
411473
.timeout(Duration.ofSeconds(30))
412474
.build()
413475

414476
val promptPersons =
415-
"""Extract all names of persons from this text:
477+
"""You are a precise name extraction tool. Extract ONLY actual person names that are LITERALLY PRESENT in this text:
416478
|{{content}}
417479
|
418480
|Rules:
419-
|- Only extract persons: Full names of individuals mentioned in the text
420481
|- Do NOT extract places: Geographical locations including countries, cities, regions, landmarks, or specific addresses
421482
|- Do NOT extract organizations: Names of companies, institutions, government bodies, or any other formal groups
483+
|- If unsure whether something is a person name, exclude it
422484
|- Return output as JSON with exactly this structure: {"names": ["name1", "name2", ...]}
423-
|- If no persons are found, return: {"names": []}
485+
|- If no persons are found in text, return exactly: {"names": []}
486+
|
487+
|Examples:
488+
|Text: "John Smith visited the library yesterday."
489+
|Response: {"names": ["John Smith"]}
490+
|
491+
|Text: "The weather is sunny today in California."
492+
|Response: {"names": []}
493+
|
494+
|Text: "This category is for articles with short descriptions defined on Wikipedia by {{short description}}"
495+
|Response: {"names": []}
496+
|
424497
""".stripMargin
425498

426499
val message = UserMessage.from(promptPersons.replace("{{content}}", content))
427500

428501
try {
429502
val response = model.chat(message)
430503
val personsFoundText = response.aiMessage().text().trim()
431-
432504
val personsFoundList = parseJSONResponse(personsFoundText)
433505

434506
if (personsFoundList.isEmpty) {
435507
Future(ctx)
436508
} else {
437-
logger.info(s"[${ctx.traceId}] Local Ollama NER found persons: $personsFoundList from content: $content")
438-
Future(ctx.copy(personsFoundLocal = personsFoundList))
509+
val verifiedPersons = personsFoundList.filter { personName =>
510+
val isPresent = content.toLowerCase.contains(personName.toLowerCase)
511+
if (!isPresent) {
512+
logger.debug(s"[${ctx.traceId}] Skipping: $personName - not found in content")
513+
}
514+
isPresent
515+
}
516+
517+
if (verifiedPersons.isEmpty) {
518+
logger.debug(s"[${ctx.traceId}] No verified persons found after content validation")
519+
Future(ctx)
520+
} else {
521+
logger.info(s"[${ctx.traceId}] Local Ollama NER found persons: $verifiedPersons from content: $content")
522+
Future(ctx.copy(personsFoundLocal = verifiedPersons))
523+
}
439524
}
440525
} catch {
441526
case e: Exception =>

0 commit comments

Comments
 (0)