Код не работает для FileTailSource в akka Stream

У меня есть файл журнала, в который подаются данные о каждом посещении некоторых веб-сайтов (на самом деле это просто программа на Python, которая имитирует это), я хочу подсчитать количество посещений на веб-сайты. Итак, я пытаюсь использовать FileTailSource, но он ничего не печатает в файл Можешь помочь? танки `

import akka.actor._
import akka._
import scala.concurrent._
import java.nio.file._
import akka.stream._
import akka.stream.scaladsl._
import akka.util.ByteString
import java.nio.file.Paths

import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{FileIO,Flow, Sink, Source}
import akka.stream.alpakka.file.scaladsl.FileTailSource
import scala.concurrent.duration._
import scala.concurrent.ExecutionContext.Implicits.global
import java.lang

object LogFileAnalyzer {
  def main(args: Array[String]): Unit = {
    // Create actor system and materializer
    implicit val system = ActorSystem("LogFileAnalyzer")
    implicit val materializer = ActorMaterializer()

    // Read the log file as a source of lines
    val logFile = Paths.get("./src/main/scala/log-generator.log")
    val source =   FileTailSource(logFile,maxChunkSize = 4096,startingPosition=0L,pollingInterval = 250.millis).via(Framing.delimiter(ByteString("\n"), maximumFrameLength = 4096, allowTruncation = false)).map { line =>
      val fields = line.utf8String.split(" ")
      val website = fields(2)
      (website,1)
    }.groupBy(8,_._1).mapConcat()
    .reduce { (entry1, entry2) =>
    val (website1, visits1) = entry1
    val (website2, visits2) = entry2
    (website1, visits1 + visits2)}

    .map { case (website, count) => s"($website, $count)" }
    .map(s=>ByteString(s))



    // Parse each line of the log file to extract the website name
    //val websiteFlow = Flow[ByteString]
    
  
val sink = FileIO.toPath(Paths.get("./f.txt")) 
    // Group the website names and count the number of visits for each website
    //case (website, visits) => (website, visits +1)
    source.to(sink).run()
}
 
}

`

когда я работал с FileIO.fromPath, код работал нормально, но он запускается один раз и не обрабатывает новые потоковые данные я обнаружил проблему, и я думаю, что это как-то связано с функциями groupby или reduce

Ваш основной метод ничего не ждет, и поэтому ваша программа немедленно завершается - см. ответ здесь: stackoverflow.com/questions/74737002/…

MartinHH 18.12.2022 08:53

Тоже не работает, проблема, я думаю, в функции сокращения, потому что, когда я комментирую ее, код работает, он не дает желаемого результата.

ach2ashes 18.12.2022 14:57
Стоит ли изучать PHP в 2023-2024 годах?
Стоит ли изучать PHP в 2023-2024 годах?
Привет всем, сегодня я хочу высказать свои соображения по поводу вопроса, который я уже много раз получал в своем сообществе: "Стоит ли изучать PHP в...
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
В JavaScript одним из самых запутанных понятий является поведение ключевого слова "this" в стрелочной и обычной функциях.
Приемы CSS-макетирования - floats и Flexbox
Приемы CSS-макетирования - floats и Flexbox
Здравствуйте, друзья-студенты! Готовы совершенствовать свои навыки веб-дизайна? Сегодня в нашем путешествии мы рассмотрим приемы CSS-верстки - в...
Тестирование функциональных ngrx-эффектов в Angular 16 с помощью Jest
В системе управления состояниями ngrx, совместимой с Angular 16, появились функциональные эффекты. Это здорово и делает код определенно легче для...
Концепция локализации и ее применение в приложениях React ⚡️
Концепция локализации и ее применение в приложениях React ⚡️
Локализация - это процесс адаптации приложения к различным языкам и культурным требованиям. Это позволяет пользователям получить опыт, соответствующий...
Пользовательский скаляр GraphQL
Пользовательский скаляр GraphQL
Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип...
0
2
65
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

Основная проблема (кроме того факта, что ваш основной метод не ожидает завершения вашего потока и, следовательно, ваша программа завершится слишком рано):

reduce выдает только одно значение, когда поток завершается (выдает, когда восходящий поток завершает), но поток никогда не завершится, потому что FileTailSource будет постоянно опрашивать новые строки входного файла.

Возможно, вы захотите использовать сканирование вместо reduce (поскольку scan будет выдавать новый элемент всякий раз, когда новый элемент будет получен от восходящего потока).

Это решает проблему, но не совсем то, что я ищу, теперь у меня столько же дубликатов, сколько и веб-сайтов, спасибо за вашу помощь, хотя дайте мне знать, если вы знаете какой-либо ресурс, который мог бы помочь, спасибо.

ach2ashes 20.12.2022 03:39

Другие вопросы по теме