Source code for my_application.repositories.BaselineToMongo


# Copyright 2020 Nedeljko Radulovic, Dihia Boulegane, Albert Bifet
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import json
import orjson
import datetime
import operator
import os
from confluent_kafka import Consumer, Producer

"""
This module provides the Baseline classifier class for a given problem.
Depending of the nature of the problem regression/classification it creates a baseline model 
average value/ majority class voting.
It serves as first line comparison of performance with user's model.
"""

with open('/home/nedeljko/PycharmProjects/Competition_to_publish/SCALAR/provider/my_application/config.json') as json_data_file:
    config = json.load(json_data_file)

try:
    _MONGO_HOST = os.environ['MONGO_HOST']
except Exception:
    _MONGO_HOST = config['MONGO_HOST']


[docs]class BaselineToMongo: consumer = None mongo_repository = None def __init__(self, kafka_server, topic, competition, competition_config): conf = {'bootstrap.servers': kafka_server, 'group.id': 'baseline', 'session.timeout.ms': competition.initial_training_time * 10000, 'auto.offset.reset': 'earliest'} self.consumer = Consumer(conf) self.consumer.subscribe([topic]) self.config = competition_config self.targets = competition_config.keys() self.competition_id = competition.competition_id conf_producer = {'bootstrap.servers': kafka_server} self.producer = Producer(conf_producer) self.output_topic = competition.name.lower().replace(" ", "") + 'predictions'
[docs] def write(self): regression_targets = [] classification_targets = [] for key, value in self.config.items(): for item in value: if item == 'MAPE': regression_targets.append(key) elif item == 'F1': classification_targets.append(key) target_dict = {} num_records = {} sum_values = {} for target in classification_targets: target_dict[target] = {} for target in regression_targets: num_records[target] = 0 sum_values[target] = 0 while True: try: msg = self.consumer.poll(timeout=0) message = orjson.loads(msg.value()) prediction_dict = {'rowID': message['rowID'], 'prediction_competition_id': self.competition_id, 'user_id': 0} prediction_dict, num_records, sum_values = self.regression(message, prediction_dict, num_records, sum_values) prediction_dict, target_dict = self.classification(message, target_dict, prediction_dict) if message['tag'] == 'TEST': submitted_on = datetime.datetime.now() prediction_dict['submitted_on'] = submitted_on.strftime("%Y-%m-%d %H:%M:%S") self.producer.produce(self.output_topic, orjson.dumps(prediction_dict)) self.producer.poll(timeout=0) except Exception as e: continue
[docs] @staticmethod def regression(message, prediction_dict, num_records, sum_values): if message['tag'] == 'TEST': for key, value in sum_values.items(): prediction_dict['prediction_' + key] = float(sum_values[key]) / int(num_records[key]) if message['tag'] == 'INIT' or message['tag'] == 'TRAIN': for target in sum_values.keys(): num_records[target] = num_records[target] + 1 sum_values[target] = sum_values[target] + float(message[target]) return prediction_dict, num_records, sum_values
[docs] @staticmethod def classification(message, target_dict, prediction_dict): if message['tag'] == 'TEST': for target, value in target_dict.items(): prediction_dict['prediction_' + target] = max(value.items(), key=operator.itemgetter(1))[0] if message['tag'] == 'INIT' or message['tag'] == 'TRAIN': for target in target_dict.keys(): if str(message[target]) in target_dict[target]: target_dict[target][str(message[target])] += 1 else: target_dict[target][str(message[target])] = 1 return prediction_dict, target_dict