Scala-akka: - Передача аргументов командной строки в sbt run для поиска файлов .csv

У меня есть программа командной строки, которая вычисляет статистику по данным датчика влажности.

У меня также есть файлы .csv внутри src/main/scala/data

Когда я делаю sbt "run data" или sbt "run src/main/scala/data"

Похоже, он не может найти файлы .csv, и я получаю результат как 0.

Выход для sbt "run src/main/scala/data"

Ищем файлы CSV в каталоге: src/main/scala/data

Выход для sbt "run data"

Ищем файлы CSV в каталоге: данные Количество обработанных файлов: 0 Количество обработанных измерений: 0 Количество неудачных измерений: 0 Датчики с самой высокой средней влажностью: идентификатор датчика, мин., среднее, макс.

Ожидаемый пример вывода: -

Количество обработанных файлов: 2 Количество обработанных измерений: 7 Количество неудачных измерений: 2 Датчики с самой высокой средней влажностью: идентификатор датчика, мин., среднее, макс. с2,78,82,88 с1,10,54,98 s3, НаН, НаН, НаН

Код для справки: -

import java.io.File
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{FileIO, Framing, Sink, Source}
import akka.util.ByteString
import scala.collection.mutable
import scala.concurrent.ExecutionContext.Implicits.global

object HumiditySensorStatistics {

  case class HumidityData(sum: Double, count: Int) {
    def avg: Option[Double] = if (count > 0) Some(sum / count) else None
  }

  case class SensorStats(min: Option[Double], avg: Option[Double], max: Option[Double])

  def main(args: Array[String]): Unit = {
    val directoryPath = args(0)
    implicit val system: ActorSystem = ActorSystem("HumiditySensorStatistics")
    implicit val materializer: ActorMaterializer = ActorMaterializer()
    val sensors = mutable.Map[String, HumidityData]()
    var failedMeasurements = 0

    println(s"Looking for CSV files in directory: $directoryPath")

    val fileSource = Source.fromIterator(() => new File(directoryPath).listFiles().iterator)
    val measurementSource = fileSource.flatMapConcat(f => FileIO.fromPath(f.toPath))
      .via(Framing.delimiter(ByteString("\n"), maximumFrameLength = 1024, allowTruncation = true))
      .drop(1) // skip header line
      .map(_.utf8String)
      .map(line => {
        val fields = line.split(",")
        (fields(0), fields(1))
      })
    val sink = Sink.foreach[(String, String)](data => {
      val sensorId = data._1
      val humidity = data._2.toDoubleOption

      if (humidity.isDefined) {
        sensors.put(sensorId, sensors.getOrElse(sensorId, HumidityData(0.0, 0)) match {
          case HumidityData(sum, count) => HumidityData(sum + humidity.get, count + 1)
        })
      } else {
        failedMeasurements += 1
      }
    })

    measurementSource.runWith(sink).onComplete(_ => {
      val numFilesProcessed = sensors.size
      val numMeasurementsProcessed = sensors.values.map(_.count).sum
      val numFailedMeasurements = failedMeasurements

      println(s"Num of processed files: $numFilesProcessed")
      println(s"Num of processed measurements: $numMeasurementsProcessed")
      println(s"Num of failed measurements: $numFailedMeasurements")

      val statsBySensor = sensors.map {
        case (sensorId, humidityData) =>
          val stats = SensorStats(
            min = Some(humidityData.sum / humidityData.count),
            avg = humidityData.avg,
            max = Some(humidityData.sum / humidityData.count)
          )
          (sensorId, stats)
      }

      println("Sensors with highest avg humidity:")
      println("sensor-id,min,avg,max")
      statsBySensor.toList.sortBy(_._2.avg).reverse.foreach {
        case (sensorId, stats) =>
          println(s"$sensorId,${stats.min.getOrElse("NaN")},${stats.avg.getOrElse("NaN")},${stats.max.getOrElse("NaN")}")
      }
      
      system.terminate()
    })
  }
}

Build.sbt

ThisBuild / version := "0.1.0-SNAPSHOT"

ThisBuild / scalaVersion := "2.13.8"

lazy val root = (project in file("."))
  .settings(
    name := "sensor-task"
  )

libraryDependencies ++= Seq(
  "com.typesafe.akka" %% "akka-stream" % "2.6.16",
)

Данные файла .csv: -

идентификатор датчика влажность с1 80 с3 NaN с2 78 с1 98

Вы пробовали пройти абсолютный путь?

Gaël J 15.02.2023 13:22

В зависимости от того, как вы запускаете свой код, «текущий каталог» может отличаться (например, IDE против простого SBT).

Gaël J 15.02.2023 13:22

Да, я скопировал абсолютный путь к каталогу «данные», и он не возвращает результаты. D:\sensor-task> sbt "run sensor-task\src\main\scala\data" и D:\sensor-task> sbt "run D:\sensor-task\src\main\scala\data"

Always_A_Learner 15.02.2023 13:58
Laravel с Turbo JS
Laravel с Turbo JS
Turbo - это библиотека JavaScript для упрощения создания быстрых и высокоинтерактивных веб-приложений. Она работает с помощью техники под названием...
Типы ввода HTML: Лучшие практики и советы
Типы ввода HTML: Лучшие практики и советы
HTML, или HyperText Markup Language , является стандартным языком разметки, используемым для создания веб-страниц. Типы ввода HTML - это различные...
Аутсорсинг разработки PHP для индивидуальных веб-решений
Аутсорсинг разработки PHP для индивидуальных веб-решений
Услуги PHP-разработки могут быть экономически эффективным решением для компаний, которые ищут высококачественные услуги веб-разработки по доступным...
Понимание Python и переход к SQL
Понимание Python и переход к SQL
Перед нами лабораторная работа по BloodOath:
Слишком много useState? Давайте useReducer!
Слишком много useState? Давайте useReducer!
Современный фронтенд похож на старую добрую веб-разработку, но с одной загвоздкой: страница в браузере так же сложна, как и бэкенд.
Узнайте, как использовать теги <ul> и <li> для создания неупорядоченных списков в HTML
Узнайте, как использовать теги <ul> и <li> для создания неупорядоченных списков в HTML
HTML предоставляет множество тегов для структурирования и организации содержимого веб-страницы. Одним из наиболее часто используемых тегов для...
0
3
56
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

Обе ваши команды sbt (то есть sbt "run data" и sbt "run src/main/scala/data") выглядят правильно, при условии, что вы запускаете sbt из корня проекта Scala с исходным кодом в «src/main/scala/» и csv-файлами в «src/main/scala». /данные/".

Пара наблюдаемых проблем с кодом:

При создании fileSource есть большая вероятность, что new File().listFiles() получит больше файлов, чем вы собираетесь включить (например, файлы, отличные от CSV, скрытые файлы и т. д.), в результате чего после прохождения через Framing.delimiter() появится один большой двоичный объект, который впоследствии будет удален drop(1). В таком случае карта «sensors» будет пустой, в результате чего на выходе будут все 0.

Мне удалось воспроизвести результат «все 0», используя ваш точный исходный код и «build.sbt», по-видимому, из-за файлов, отличных от CSV (в моем тестовом примере, файл «.DS_Store»), включенных в listFiles().

Предоставление конкретных критериев выбора файлов для listFiles(), таких как включение только «*.csv», как показано ниже, должно решить проблему:

val fileSource = Source.fromIterator( () =>
  new File(directoryPath).listFiles((_, name) => name.endsWith(".csv")).iterator
)

Другая проблема заключается в том, что логика вычисления (humidityData.sum / humidityData.count) для min и max неверна, по существу повторяя вычисление avg. Чтобы их вычислить, можно расширить параметры в HumidityData следующим образом:

case class HumidityData(sum: Double, count: Int, min: Double, max: Double) {...}

Затем min/max можно обновить примерно так:

humidity match {
  case Some(h) =>
    sensors.put(sensorId, sensors.getOrElse(sensorId, HumidityData(0.0, 0, Double.MaxValue, 0.0)) match {
      case HumidityData(sum, count, min, max) =>
        HumidityData(sum + h, count + 1, Math.min(h, min), Math.max(h, max))
    })
  case None =>
    failedMeasurements += 1
}

В качестве примечания я бы рекомендовал отделить данные от кода, переместив файлы данных из «src/main/scala/» и, возможно, поместив их, скажем, в «src/main/resources/data/».

Тестирование со следующими файлами данных csv...

File src/main/resources/data/sensor_data1.csv:
sensor-id,humidity
s1,80
s3,NaN
s2,78
s1,98

File src/main/resources/data/sensor_data2.csv:
sensor-id,humidity
s1,70
s3,80
s2,60

$ sbt "run src/main/resources/data"
[info] welcome to sbt 1.5.5 (Oracle Corporation Java 1.8.0_181)
[info] loading settings for project global-plugins from idea.sbt ...
[info] loading global plugins from /Users/leo/.sbt/1.0/plugins
[info] loading project definition from /Users/leo/work/so-75459442/project
[info] loading settings for project root from build.sbt ...
[info] set current project to sensor-task (in build file:/Users/leo/work/so-75459442/)
[info] running HumiditySensorStatistics src/main/resources/data
Looking for CSV files in directory: src/main/resources/data
Num of processed sensors: 3
Num of processed measurements: 7
Num of failed measurements: 1
Sensors with highest avg humidity:
sensor-id,min,avg,max
s3,NaN,NaN,NaN
s1,70.0,82.66666666666667,98.0
s2,60.0,69.0,78.0
[success] Total time: 2 s, completed Feb 16, 2023 10:33:35 AM

Прилагается исправленный исходный код.

import java.io.File
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{FileIO, Framing, Sink, Source}
import akka.util.ByteString
import scala.collection.mutable
import scala.concurrent.ExecutionContext.Implicits.global

object HumiditySensorStatistics {

  case class HumidityData(sum: Double, count: Int, min: Double, max: Double) {
    def avg: Option[Double] = if (count > 0) Some(sum / count) else None
  }

  case class SensorStats(min: Option[Double], avg: Option[Double], max: Option[Double])

  def main(args: Array[String]): Unit = {
    val directoryPath = args(0)
    implicit val system: ActorSystem = ActorSystem("HumiditySensorStatistics")
    // implicit val materializer: ActorMaterializer = ActorMaterializer()  // Not needed for Akka Stream 2.6+
    val sensors = mutable.Map[String, HumidityData]()
    var failedMeasurements = 0

    println(s"Looking for CSV files in directory: $directoryPath")

    val fileSource = Source.fromIterator( () =>
      new File(directoryPath).listFiles((_, name) => name.endsWith(".csv")).iterator
    )
    val measurementSource = fileSource.flatMapConcat(f => FileIO.fromPath(f.toPath))
      .via(Framing.delimiter(ByteString("\n"), maximumFrameLength = 1024, allowTruncation = true))
      .drop(1) // skip header line
      .map(_.utf8String)
      .map(line => {
        val fields = line.split(",")
        (fields(0), fields(1))
      })
    val sink = Sink.foreach[(String, String)](data => {
      val sensorId = data._1
      val humidity = data._2.toDoubleOption

      humidity match {
        case Some(h) =>
          sensors.put(sensorId, sensors.getOrElse(sensorId, HumidityData(0.0, 0, Double.MaxValue, 0.0)) match {
            case HumidityData(sum, count, min, max) =>
              HumidityData(sum + h, count + 1, Math.min(h, min), Math.max(h, max))
          })
        case None =>
          failedMeasurements += 1
      }
    })

    measurementSource.runWith(sink).onComplete(_ => {
      val numSensorsProcessed = sensors.size
      val numMeasurementsProcessed = sensors.values.map(_.count).sum
      val numFailedMeasurements = failedMeasurements

      println(s"Num of processed sensors: $numSensorsProcessed")
      println(s"Num of processed measurements: $numMeasurementsProcessed")
      println(s"Num of failed measurements: $numFailedMeasurements")

      val statsBySensor = sensors.map {
        case (sensorId, humidityData) =>
          val stats = SensorStats(
            min = Some(humidityData.min),
            avg = humidityData.avg,
            max = Some(humidityData.max)
          )
          (sensorId, stats)
      }

      println("Sensors with highest avg humidity:")
      println("sensor-id,min,avg,max")
      statsBySensor.toList.sortBy(_._2.avg).reverse.foreach {
        case (sensorId, stats) =>
          println(s"$sensorId,${stats.min.getOrElse("NaN")},${stats.avg.getOrElse("NaN")},${stats.max.getOrElse("NaN")}")
      }

      system.terminate()
    })
  }
}

После внесения всех изменений я получаю только Looking for CSV files in directory: src/main/resources/data по команде sbt "run src/main/resources/data". и когда я ввожу команду типа ```sbt "запустить данные", это дает мне Поиск файлов CSV в каталоге: данные Количество обработанных файлов: 0 Количество обработанных измерений: 0 Количество неудачных измерений: 0 Датчики с самой высокой средней влажностью: датчик- id,min,avg,max Датчики без измерений:

Always_A_Learner 16.02.2023 07:17

Я также ограничил его и выполнил команду от имени администратора. Поиск файлов CSV в каталоге: src/main/resources/data Файл обработки: LeaderOne.csv Файл обработки: sensor-data.csv [успешно] Общее время: 5 с, выполнено 16 февраля 2023 г. 13:31:18 Количество обработанных файлов: 3 Количество обработанных измерений: 7 Количество неудачных измерений: 1 я не получаю ни одного из них Sensors with highest avg humidity: sensor-id,min,avg,max

Always_A_Learner 16.02.2023 09:20

Я отладил код и обнаружил, что sensors, который является изменяемой картой, все время остается пустым и содержит пометки внутри него. И я не могу понять, как это работает на вашей стороне.

Always_A_Learner 17.02.2023 11:02

Пустая изменяемая карта — это то, с чем я столкнулся, когда listFile() включал неправильные данные csv. Вы пытались использовать точный пересмотренный исходный код и тестовые данные, которые я указал в своем ответе? Было бы полезно, если бы у вас был минимальный набор тестовых данных, который может воспроизвести результат «все 0», и в этом случае просто добавьте его вместе с выводом sbt к вашему вопросу.

Leo C 17.02.2023 17:21

Да, я использовал только ваш код. При добавлении println(s"Added humidity reading $humidity for sensor $sensorId. Current data: ${sensors(sensorId)}, keys: ${sensors.keys}") внутрь val sink данные для датчиков заполняются. Но когда я попытался напечатать датчик за пределами val sink, он пуст. Это также означает, что когда управление достигает этого места val statsBySensor, а sensor.map ничего не находит внутри mutableMap. Это может быть причиной того, что он ничего не печатает после println("sensor-id,min,avg,max") на консоль.

Always_A_Learner 17.02.2023 18:07

Если вы видите sensors заполненным во время создания val source, но не получаете выходных данных от обратного вызова onComplete, возможно, ваше приложение завершает работу до того, как обратный вызов успеет завершиться. Попробуйте добавить Await в конце основного приложения, например. сделайте measurementSource.runWith(sink) val (например, sensorStats), затем sensorStats.onComplete(...), а затем Await.ready(sensorStats, 2000.millis).

Leo C 18.02.2023 01:15

Начал показывать результаты. s3,NaN,NaN,NaN s1,70.0,82.66666666666667,98.0 s2,60.0,69.0,78.0. Принял ответ.

Always_A_Learner 18.02.2023 08:28

Это правильный способ показать количество обработанных файлов: - val files = new File(directoryPath).listFiles((_, name) => name.endsWith(".csv")) val numFiles = files.length Или я должен просто использовать это без фильтра `val files = new File(directoryPath).listFiles`

Always_A_Learner 19.02.2023 08:53

Поскольку файлы, обрабатываемые в вашем потоке, являются отфильтрованными файлами, я бы просто определил val files = new File(directoryPath).listFiles((_, name) => name.endsWith(".csv")) перед определением val fileSource = Source.fromIterator(() => files.iterator), а затем использовал files.length в выводе вашей статистики.

Leo C 19.02.2023 17:45

Другие вопросы по теме