У меня есть файл журнала, в который подаются данные о каждом посещении некоторых веб-сайтов (на самом деле это просто программа на Python, которая имитирует это), я хочу подсчитать количество посещений на веб-сайты.
Итак, я пытаюсь использовать FileTailSource
, но он ничего не печатает в файл
Можешь помочь?
танки
`
import akka.actor._
import akka._
import scala.concurrent._
import java.nio.file._
import akka.stream._
import akka.stream.scaladsl._
import akka.util.ByteString
import java.nio.file.Paths
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{FileIO,Flow, Sink, Source}
import akka.stream.alpakka.file.scaladsl.FileTailSource
import scala.concurrent.duration._
import scala.concurrent.ExecutionContext.Implicits.global
import java.lang
object LogFileAnalyzer {
def main(args: Array[String]): Unit = {
// Create actor system and materializer
implicit val system = ActorSystem("LogFileAnalyzer")
implicit val materializer = ActorMaterializer()
// Read the log file as a source of lines
val logFile = Paths.get("./src/main/scala/log-generator.log")
val source = FileTailSource(logFile,maxChunkSize = 4096,startingPosition=0L,pollingInterval = 250.millis).via(Framing.delimiter(ByteString("\n"), maximumFrameLength = 4096, allowTruncation = false)).map { line =>
val fields = line.utf8String.split(" ")
val website = fields(2)
(website,1)
}.groupBy(8,_._1).mapConcat()
.reduce { (entry1, entry2) =>
val (website1, visits1) = entry1
val (website2, visits2) = entry2
(website1, visits1 + visits2)}
.map { case (website, count) => s"($website, $count)" }
.map(s=>ByteString(s))
// Parse each line of the log file to extract the website name
//val websiteFlow = Flow[ByteString]
val sink = FileIO.toPath(Paths.get("./f.txt"))
// Group the website names and count the number of visits for each website
//case (website, visits) => (website, visits +1)
source.to(sink).run()
}
}
`
когда я работал с FileIO.fromPath
, код работал нормально, но он запускается один раз и не обрабатывает новые потоковые данные
я обнаружил проблему, и я думаю, что это как-то связано с функциями groupby или reduce
Тоже не работает, проблема, я думаю, в функции сокращения, потому что, когда я комментирую ее, код работает, он не дает желаемого результата.
Основная проблема (кроме того факта, что ваш основной метод не ожидает завершения вашего потока и, следовательно, ваша программа завершится слишком рано):
reduce
выдает только одно значение, когда поток завершается (выдает, когда восходящий поток завершает), но поток никогда не завершится, потому что FileTailSource
будет постоянно опрашивать новые строки входного файла.
Возможно, вы захотите использовать сканирование вместо reduce
(поскольку scan
будет выдавать новый элемент всякий раз, когда новый элемент будет получен от восходящего потока).
Это решает проблему, но не совсем то, что я ищу, теперь у меня столько же дубликатов, сколько и веб-сайтов, спасибо за вашу помощь, хотя дайте мне знать, если вы знаете какой-либо ресурс, который мог бы помочь, спасибо.
Ваш основной метод ничего не ждет, и поэтому ваша программа немедленно завершается - см. ответ здесь: stackoverflow.com/questions/74737002/…