Source code for my_application.sparkEvaluator


# Copyright 2020 Nedeljko Radulovic, Dihia Boulegane, Albert Bifet
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql import functions as F


[docs]def evaluate(spark_context, kafka_broker, competition, competition_config, window_duration, prediction_window_duration, train_schema, prediction_schema, columns_to_sum, checkpoints, targets): """ The function for online evaluation of the incremental predicting models. It computes selected evaluation measures and stores them in the database. :param spark_context: :param kafka_broker: Address of kafka server :param competition: Competition object :param competition_config: Competition configuration dictionary :param window_duration: Time (in seconds) to keep the test batches in the memory when performing a join :param prediction_window_duration: Time (in seconds) to keep the predictions in the memory when performing a join :param train_schema: Column names and types in the training batch :param prediction_schema: Column names and types in the prediction batch :param columns_to_sum: Columns to aggregate for evaluation :param checkpoints: Checkpoint locations for Spark jobs :param targets: target column names :return: Writes to MongoDB: instances of the stream, predictions by users and evaluation measures for each user """ golden = spark_context\ .readStream\ .format("kafka")\ .option("kafka.bootstrap.servers", kafka_broker)\ .option("subscribe", competition.name.lower().replace(" ", "") + 'spark_train')\ .option("kafkaConsumer.pollTimeoutMs", 5000)\ .option("failOnDataLoss", "false")\ .load()\ .selectExpr("cast (value as string) as json")\ .select(from_json("json", train_schema).alias("data"))\ .select("data.*")\ .withColumn("timestamp_deadline", unix_timestamp('Deadline', "yyyy-MM-dd HH:mm:ss").cast(TimestampType()))\ .withColumn("timestamp_released", unix_timestamp('Released', "yyyy-MM-dd HH:mm:ss").cast(TimestampType()))\ .drop("Deadline", "Released")\ .withWatermark("timestamp_released", window_duration) # Reading stream of predictions from Kafka prediction_stream = spark_context\ .readStream\ .format("kafka")\ .option("kafka.bootstrap.servers", kafka_broker)\ .option("subscribe", competition.name.lower().replace(" ", "") + 'predictions')\ .option("kafkaConsumer.pollTimeoutMs", 1000) \ .option("failOnDataLoss", "false") \ .load()\ .selectExpr("cast (value as string) as json")\ .select(from_json("json", prediction_schema).alias("data"))\ .select("data.*")\ .withColumn("timestamp_submitted", unix_timestamp('submitted_on', "yyyy-MM-dd HH:mm:ss").cast(TimestampType()))\ .drop("submitted_on")\ .withColumnRenamed("rowID", "prediction_rowID")\ .withWatermark("timestamp_submitted", prediction_window_duration)\ .dropDuplicates(["user_id", "prediction_competition_id", "prediction_rowID"]) # Joining two streams, new conditions join_result = prediction_stream\ .join(golden, expr("""rowID = prediction_rowID AND timestamp_submitted >= timestamp_released AND timestamp_submitted <= timestamp_released + interval {}""".format(window_duration)), "leftOuter")\ .withColumn("total_num", F.lit(1))\ .withColumn("num_submissions", when(F.col("timestamp_submitted") <= F.col("timestamp_deadline"), 1).otherwise(0))\ .withColumn("penalized", abs(F.col("num_submissions") - F.lit(1)))\ .withColumn("latency", when(F.col("num_submissions") == 0, competition.predictions_time_interval).otherwise(unix_timestamp(F.col("timestamp_submitted")) - unix_timestamp(F.col("timestamp_released"))))\ .drop("timestamp_released", "competition_id", "prediction_competition_id", "prediction_rowID") for target in targets: prediction_target_col = "prediction_" + target.replace(" ", "") for measure in competition_config[target]: if measure == "MAPE": join_result = join_result\ .withColumn("MAPE" + "_" + target, when(join_result[target].isNotNull(), abs((join_result[target] - join_result[prediction_target_col])) / abs(join_result[target])).otherwise(1))\ .drop(target, prediction_target_col) if measure == "MSE": join_result = join_result\ .withColumn("MSE" + "_" + target, when(join_result[target].isNotNull(), (join_result[target] - join_result[prediction_target_col]) * (join_result[target] - join_result[prediction_target_col])).otherwise(0))\ .drop(target, prediction_target_col) if measure == "MAE": join_result = join_result\ .withColumn("MAE" + "_" + target, when(join_result[target].isNotNull(), abs(join_result[target] - join_result[prediction_target_col])).otherwise(0))\ .drop(target, prediction_target_col) if measure == "ACC": join_result = join_result\ .withColumn("ACC" + "_" + target, when(F.col(target) == F.col(prediction_target_col), 1).otherwise(0)) \ .drop(target, prediction_target_col) # join_result[target] == join_result[prediction_target_col] #TODO Kappa characteristic # Dropping unnecessary columns # exprs = {x: "sum" for x in columns_to_sum} agg_exp = [F.expr('sum(`{0}`)'.format(col)) for col in columns_to_sum] + [F.expr('max(rowID) as max_rowID')] # Grouping by user_id and aggregation agg_results = join_result\ .drop("timestamp_deadline", "timestamp_submitted")\ .groupBy("user_id")\ .agg(*agg_exp)\ .withColumn("competition_id", F.lit(competition.competition_id))\ .withColumn("total_number_of_messages", F.col("max_rowID") - F.lit(competition.initial_batch_size))\ .drop("max_rowID") agg_results = agg_results\ .withColumn("total_number_of_messages", when(agg_results["total_number_of_messages"].isNotNull(), agg_results["total_number_of_messages"]).otherwise(agg_results["sum(total_num)"])) # Calculating batch measures for target in targets: for measure in competition_config[target]: if measure == "MAPE": batch_measures = agg_results\ .withColumn("MAPE" + "_" + target, 100 * ((agg_results["sum(MAPE" + "_" + target + ")"] + (agg_results["total_number_of_messages"] - agg_results["sum(total_num)"])) / agg_results["total_number_of_messages"])) \ .drop("sum(MAPE" + "_" + target + ")") if measure == "MSE": batch_measures = agg_results\ .withColumn("MSE" + "_" + target, agg_results["sum(MSE" + "_" + target + ")"] / agg_results["total_number_of_messages"] + (agg_results["total_number_of_messages"] - agg_results["sum(num_submissions)"])) \ .drop("sum(MSE" + "_" + target + ")") if measure == "MAE": batch_measures = agg_results\ .withColumn("MAE" + "_" + target, agg_results["sum(MAE" + "_" + target + ")"] / agg_results["total_number_of_messages"] + (agg_results["total_number_of_messages"] - agg_results["sum(num_submissions)"])) \ .drop("sum(MAE" + "_" + target + ")") if measure == "ACC": batch_measures = agg_results\ .withColumn("ACC" + "_" + target, agg_results["sum(ACC" + "_" + target + ")"] / agg_results["total_number_of_messages"]) \ .drop("sum(ACC" + "_" + target + ")") pred = prediction_stream \ .selectExpr("to_json(struct(*)) AS value") \ .writeStream \ .queryName(competition.name.lower().replace(" ", "") + 'prediction_stream') \ .trigger(processingTime='30 seconds') \ .format("kafka") \ .option("kafka.bootstrap.servers", kafka_broker) \ .option("topic", competition.name.lower().replace(" ", "") + 'spark_predictions') \ .option("checkpointLocation", checkpoints[0]) \ .outputMode("append") \ .start() gold = golden \ .selectExpr("to_json(struct(*)) AS value") \ .writeStream \ .queryName(competition.name.lower().replace(" ", "") + 'training_stream') \ .format("kafka") \ .option("kafka.bootstrap.servers", kafka_broker) \ .option("topic", competition.name.lower().replace(" ", "") + 'spark_golden') \ .option("checkpointLocation", checkpoints[1]) \ .outputMode("append") \ .start() output_stream = batch_measures \ .withColumn("latency", batch_measures["sum(latency)"] / (batch_measures["sum(total_num)"])) \ .withColumnRenamed("sum(penalized)", "penalized") \ .withColumnRenamed("sum(num_submissions)", "num_submissions") \ .drop("sum(latency)", "sum(total_num)") \ .selectExpr("to_json(struct(*)) AS value") \ .writeStream \ .queryName(competition.name.lower().replace(" ", "") + 'measure_stream') \ .trigger(processingTime='30 seconds') \ .format("kafka") \ .option("kafka.bootstrap.servers", kafka_broker) \ .option("topic", competition.name.lower().replace(" ", "") + 'spark_measures') \ .option("checkpointLocation", checkpoints[2]) \ .outputMode("update") \ .start() spark_context.streams.awaitAnyTermination()