У меня есть программа командной строки, которая вычисляет статистику по данным датчика влажности.
У меня также есть файлы .csv внутри src/main/scala/data
Когда я делаю sbt "run data" или sbt "run src/main/scala/data"
Похоже, он не может найти файлы .csv, и я получаю результат как 0.
Выход для sbt "run src/main/scala/data"
Ищем файлы CSV в каталоге: src/main/scala/data
Выход для sbt "run data"
Ищем файлы CSV в каталоге: данные Количество обработанных файлов: 0 Количество обработанных измерений: 0 Количество неудачных измерений: 0 Датчики с самой высокой средней влажностью: идентификатор датчика, мин., среднее, макс.
Ожидаемый пример вывода: -
Количество обработанных файлов: 2 Количество обработанных измерений: 7 Количество неудачных измерений: 2 Датчики с самой высокой средней влажностью: идентификатор датчика, мин., среднее, макс. с2,78,82,88 с1,10,54,98 s3, НаН, НаН, НаН
Код для справки: -
import java.io.File
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{FileIO, Framing, Sink, Source}
import akka.util.ByteString
import scala.collection.mutable
import scala.concurrent.ExecutionContext.Implicits.global
object HumiditySensorStatistics {
case class HumidityData(sum: Double, count: Int) {
def avg: Option[Double] = if (count > 0) Some(sum / count) else None
}
case class SensorStats(min: Option[Double], avg: Option[Double], max: Option[Double])
def main(args: Array[String]): Unit = {
val directoryPath = args(0)
implicit val system: ActorSystem = ActorSystem("HumiditySensorStatistics")
implicit val materializer: ActorMaterializer = ActorMaterializer()
val sensors = mutable.Map[String, HumidityData]()
var failedMeasurements = 0
println(s"Looking for CSV files in directory: $directoryPath")
val fileSource = Source.fromIterator(() => new File(directoryPath).listFiles().iterator)
val measurementSource = fileSource.flatMapConcat(f => FileIO.fromPath(f.toPath))
.via(Framing.delimiter(ByteString("\n"), maximumFrameLength = 1024, allowTruncation = true))
.drop(1) // skip header line
.map(_.utf8String)
.map(line => {
val fields = line.split(",")
(fields(0), fields(1))
})
val sink = Sink.foreach[(String, String)](data => {
val sensorId = data._1
val humidity = data._2.toDoubleOption
if (humidity.isDefined) {
sensors.put(sensorId, sensors.getOrElse(sensorId, HumidityData(0.0, 0)) match {
case HumidityData(sum, count) => HumidityData(sum + humidity.get, count + 1)
})
} else {
failedMeasurements += 1
}
})
measurementSource.runWith(sink).onComplete(_ => {
val numFilesProcessed = sensors.size
val numMeasurementsProcessed = sensors.values.map(_.count).sum
val numFailedMeasurements = failedMeasurements
println(s"Num of processed files: $numFilesProcessed")
println(s"Num of processed measurements: $numMeasurementsProcessed")
println(s"Num of failed measurements: $numFailedMeasurements")
val statsBySensor = sensors.map {
case (sensorId, humidityData) =>
val stats = SensorStats(
min = Some(humidityData.sum / humidityData.count),
avg = humidityData.avg,
max = Some(humidityData.sum / humidityData.count)
)
(sensorId, stats)
}
println("Sensors with highest avg humidity:")
println("sensor-id,min,avg,max")
statsBySensor.toList.sortBy(_._2.avg).reverse.foreach {
case (sensorId, stats) =>
println(s"$sensorId,${stats.min.getOrElse("NaN")},${stats.avg.getOrElse("NaN")},${stats.max.getOrElse("NaN")}")
}
system.terminate()
})
}
}
Build.sbt
ThisBuild / version := "0.1.0-SNAPSHOT"
ThisBuild / scalaVersion := "2.13.8"
lazy val root = (project in file("."))
.settings(
name := "sensor-task"
)
libraryDependencies ++= Seq(
"com.typesafe.akka" %% "akka-stream" % "2.6.16",
)
Данные файла .csv: -
В зависимости от того, как вы запускаете свой код, «текущий каталог» может отличаться (например, IDE против простого SBT).
Да, я скопировал абсолютный путь к каталогу «данные», и он не возвращает результаты. D:\sensor-task> sbt "run sensor-task\src\main\scala\data" и D:\sensor-task> sbt "run D:\sensor-task\src\main\scala\data"
Обе ваши команды sbt (то есть sbt "run data" и sbt "run src/main/scala/data") выглядят правильно, при условии, что вы запускаете sbt из корня проекта Scala с исходным кодом в «src/main/scala/» и csv-файлами в «src/main/scala». /данные/".
Пара наблюдаемых проблем с кодом:
При создании fileSource есть большая вероятность, что new File().listFiles() получит больше файлов, чем вы собираетесь включить (например, файлы, отличные от CSV, скрытые файлы и т. д.), в результате чего после прохождения через Framing.delimiter() появится один большой двоичный объект, который впоследствии будет удален drop(1). В таком случае карта «sensors» будет пустой, в результате чего на выходе будут все 0.
Мне удалось воспроизвести результат «все 0», используя ваш точный исходный код и «build.sbt», по-видимому, из-за файлов, отличных от CSV (в моем тестовом примере, файл «.DS_Store»), включенных в listFiles().
Предоставление конкретных критериев выбора файлов для listFiles(), таких как включение только «*.csv», как показано ниже, должно решить проблему:
val fileSource = Source.fromIterator( () =>
new File(directoryPath).listFiles((_, name) => name.endsWith(".csv")).iterator
)
Другая проблема заключается в том, что логика вычисления (humidityData.sum / humidityData.count) для min и max неверна, по существу повторяя вычисление avg. Чтобы их вычислить, можно расширить параметры в HumidityData следующим образом:
case class HumidityData(sum: Double, count: Int, min: Double, max: Double) {...}
Затем min/max можно обновить примерно так:
humidity match {
case Some(h) =>
sensors.put(sensorId, sensors.getOrElse(sensorId, HumidityData(0.0, 0, Double.MaxValue, 0.0)) match {
case HumidityData(sum, count, min, max) =>
HumidityData(sum + h, count + 1, Math.min(h, min), Math.max(h, max))
})
case None =>
failedMeasurements += 1
}
В качестве примечания я бы рекомендовал отделить данные от кода, переместив файлы данных из «src/main/scala/» и, возможно, поместив их, скажем, в «src/main/resources/data/».
Тестирование со следующими файлами данных csv...
File src/main/resources/data/sensor_data1.csv:
sensor-id,humidity
s1,80
s3,NaN
s2,78
s1,98
File src/main/resources/data/sensor_data2.csv:
sensor-id,humidity
s1,70
s3,80
s2,60
$ sbt "run src/main/resources/data"
[info] welcome to sbt 1.5.5 (Oracle Corporation Java 1.8.0_181)
[info] loading settings for project global-plugins from idea.sbt ...
[info] loading global plugins from /Users/leo/.sbt/1.0/plugins
[info] loading project definition from /Users/leo/work/so-75459442/project
[info] loading settings for project root from build.sbt ...
[info] set current project to sensor-task (in build file:/Users/leo/work/so-75459442/)
[info] running HumiditySensorStatistics src/main/resources/data
Looking for CSV files in directory: src/main/resources/data
Num of processed sensors: 3
Num of processed measurements: 7
Num of failed measurements: 1
Sensors with highest avg humidity:
sensor-id,min,avg,max
s3,NaN,NaN,NaN
s1,70.0,82.66666666666667,98.0
s2,60.0,69.0,78.0
[success] Total time: 2 s, completed Feb 16, 2023 10:33:35 AM
Прилагается исправленный исходный код.
import java.io.File
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{FileIO, Framing, Sink, Source}
import akka.util.ByteString
import scala.collection.mutable
import scala.concurrent.ExecutionContext.Implicits.global
object HumiditySensorStatistics {
case class HumidityData(sum: Double, count: Int, min: Double, max: Double) {
def avg: Option[Double] = if (count > 0) Some(sum / count) else None
}
case class SensorStats(min: Option[Double], avg: Option[Double], max: Option[Double])
def main(args: Array[String]): Unit = {
val directoryPath = args(0)
implicit val system: ActorSystem = ActorSystem("HumiditySensorStatistics")
// implicit val materializer: ActorMaterializer = ActorMaterializer() // Not needed for Akka Stream 2.6+
val sensors = mutable.Map[String, HumidityData]()
var failedMeasurements = 0
println(s"Looking for CSV files in directory: $directoryPath")
val fileSource = Source.fromIterator( () =>
new File(directoryPath).listFiles((_, name) => name.endsWith(".csv")).iterator
)
val measurementSource = fileSource.flatMapConcat(f => FileIO.fromPath(f.toPath))
.via(Framing.delimiter(ByteString("\n"), maximumFrameLength = 1024, allowTruncation = true))
.drop(1) // skip header line
.map(_.utf8String)
.map(line => {
val fields = line.split(",")
(fields(0), fields(1))
})
val sink = Sink.foreach[(String, String)](data => {
val sensorId = data._1
val humidity = data._2.toDoubleOption
humidity match {
case Some(h) =>
sensors.put(sensorId, sensors.getOrElse(sensorId, HumidityData(0.0, 0, Double.MaxValue, 0.0)) match {
case HumidityData(sum, count, min, max) =>
HumidityData(sum + h, count + 1, Math.min(h, min), Math.max(h, max))
})
case None =>
failedMeasurements += 1
}
})
measurementSource.runWith(sink).onComplete(_ => {
val numSensorsProcessed = sensors.size
val numMeasurementsProcessed = sensors.values.map(_.count).sum
val numFailedMeasurements = failedMeasurements
println(s"Num of processed sensors: $numSensorsProcessed")
println(s"Num of processed measurements: $numMeasurementsProcessed")
println(s"Num of failed measurements: $numFailedMeasurements")
val statsBySensor = sensors.map {
case (sensorId, humidityData) =>
val stats = SensorStats(
min = Some(humidityData.min),
avg = humidityData.avg,
max = Some(humidityData.max)
)
(sensorId, stats)
}
println("Sensors with highest avg humidity:")
println("sensor-id,min,avg,max")
statsBySensor.toList.sortBy(_._2.avg).reverse.foreach {
case (sensorId, stats) =>
println(s"$sensorId,${stats.min.getOrElse("NaN")},${stats.avg.getOrElse("NaN")},${stats.max.getOrElse("NaN")}")
}
system.terminate()
})
}
}
После внесения всех изменений я получаю только Looking for CSV files in directory: src/main/resources/data по команде sbt "run src/main/resources/data". и когда я ввожу команду типа ```sbt "запустить данные", это дает мне Поиск файлов CSV в каталоге: данные Количество обработанных файлов: 0 Количество обработанных измерений: 0 Количество неудачных измерений: 0 Датчики с самой высокой средней влажностью: датчик- id,min,avg,max Датчики без измерений:
Я также ограничил его и выполнил команду от имени администратора. Поиск файлов CSV в каталоге: src/main/resources/data Файл обработки: LeaderOne.csv Файл обработки: sensor-data.csv [успешно] Общее время: 5 с, выполнено 16 февраля 2023 г. 13:31:18 Количество обработанных файлов: 3 Количество обработанных измерений: 7 Количество неудачных измерений: 1 я не получаю ни одного из них Sensors with highest avg humidity: sensor-id,min,avg,max
Я отладил код и обнаружил, что sensors, который является изменяемой картой, все время остается пустым и содержит пометки внутри него. И я не могу понять, как это работает на вашей стороне.
Пустая изменяемая карта — это то, с чем я столкнулся, когда listFile() включал неправильные данные csv. Вы пытались использовать точный пересмотренный исходный код и тестовые данные, которые я указал в своем ответе? Было бы полезно, если бы у вас был минимальный набор тестовых данных, который может воспроизвести результат «все 0», и в этом случае просто добавьте его вместе с выводом sbt к вашему вопросу.
Да, я использовал только ваш код. При добавлении println(s"Added humidity reading $humidity for sensor $sensorId. Current data: ${sensors(sensorId)}, keys: ${sensors.keys}") внутрь val sink данные для датчиков заполняются. Но когда я попытался напечатать датчик за пределами val sink, он пуст. Это также означает, что когда управление достигает этого места val statsBySensor, а sensor.map ничего не находит внутри mutableMap. Это может быть причиной того, что он ничего не печатает после println("sensor-id,min,avg,max") на консоль.
Если вы видите sensors заполненным во время создания val source, но не получаете выходных данных от обратного вызова onComplete, возможно, ваше приложение завершает работу до того, как обратный вызов успеет завершиться. Попробуйте добавить Await в конце основного приложения, например. сделайте measurementSource.runWith(sink) val (например, sensorStats), затем sensorStats.onComplete(...), а затем Await.ready(sensorStats, 2000.millis).
Начал показывать результаты. s3,NaN,NaN,NaN s1,70.0,82.66666666666667,98.0 s2,60.0,69.0,78.0. Принял ответ.
Это правильный способ показать количество обработанных файлов: - val files = new File(directoryPath).listFiles((_, name) => name.endsWith(".csv")) val numFiles = files.length Или я должен просто использовать это без фильтра `val files = new File(directoryPath).listFiles`
Поскольку файлы, обрабатываемые в вашем потоке, являются отфильтрованными файлами, я бы просто определил val files = new File(directoryPath).listFiles((_, name) => name.endsWith(".csv")) перед определением val fileSource = Source.fromIterator(() => files.iterator), а затем использовал files.length в выводе вашей статистики.
Вы пробовали пройти абсолютный путь?