API Reference

my_application package

Submodules

my_application.auth module

my_application.auth.decode_auth_token(auth_token)[source]

Decode authentication token.

Parameters

auth_token

Returns

Responses: {100: Invalid or expired token, 200: Success}

my_application.auth.encode_auth_token(user)[source]

Encode authentication token.

Parameters

user

Returns

Responses: {401: Error, 200: Success}

my_application.auth.get_auth_token(user)[source]

Get authentication token for a given user.

Parameters

user – dictionary {‘id’: user email, ‘password’: user password}

Returns

Responses: {404: User is not registered or password is incorrect, 401: User has not confirmed the email address, 500: Internal server error}

Returns

Log in success if the password is correct and user has confirmed his/her address.

my_application.consumer module

class my_application.consumer.DataStreamerServicer(server, competition, competition_config)[source]

Bases: object

Datastream Servicer handles the communication with users. Sends the datastream records and handles the predictions that are sent by users.

It creates the topics. Starts Kafka producers and loads the data structures for communication with users, generated from .proto file.

sendData(request_iterator, context)[source]

After the user has initialized the communication with the server. It checks user’s credentials and starts sending the data records. :param request_iterator: Sent by the user through gRPC/Protobuf protocol :param context: data :return:

class my_application.consumer.ProducerToMongoSink(kafka_server)[source]

Bases: object

Kafka producer.

Publishes messages to a given topic.

daemon = True
producer = None
send(topic, prediction)[source]

Publish messages to a given topic in byte format.

Parameters
  • topic – Kafka topic

  • prediction – data

Returns

my_application.consumer.receive_predictions(predictions, competition_id, user_id, end_date, kafka_producer, spark_topic, targets, stop)[source]

This function receives the predictions from the users and publishes them to a Kafka topic so they can be read by Spark module.

Parameters
  • predictions – Batch predictions sent by user

  • competition_id – Competition ID

  • user_id – User ID

  • end_date – Competition end date

  • kafka_producer – Kafka producer for a given user

  • spark_topic – Kafka topic to publish in, so it would be read by Spark

  • targets – label columns

  • stop – Kill signal for the thread

Returns

my_application.producer module

class my_application.producer.CompetitionProducer(server)[source]

Bases: object

static chunker(seq, size)[source]

Returns data in chunks (batches) of a given size.

create_competition(competition, items, predictions, initial_batch)[source]

Create a competition and start releasing the data stream.

daemon = True
static is_not_empty(row)[source]

Check if row is empty.

main(topic, initial_batch, items, predictions, initial_training_time, batch_size, time_interval, predictions_time_interval, spark_topic, competition_id)[source]

Recreates the stream. Sends the data in batches: first test (without the target value) and then train batches. All batches are sent according to the time intervals set for the current competition.

Parameters
  • topic

  • initial_batch

  • items

  • predictions

  • initial_training_time

  • batch_size

  • time_interval

  • predictions_time_interval

  • spark_topic

  • competition_id

Returns

producer = None
send(topic, message)[source]
class my_application.producer.Scheduler[source]

Bases: object

Job Scheduler.

Used to store new competitions in MongoDBJobStore and initiate the beginning of the competition based on its start date.

schedule_competition(competition, competition_config)[source]

Schedule when the competition starts and ends.

Parameters
  • competition – Competition object

  • competition_config – Competition config

start()[source]

Starts the scheduler.

my_application.producer.read_csv_file(competition, competition_config, data_format='csv')[source]

Reads the data from a .csv file for a given competition.

Parameters
  • competition – Competition object

  • competition_config – Cometition configuration

  • data_format – Format of the data. For now, the only format supported is ‘csv’

Returns

It returns the read data in batches(without the target value), the target values and the initial batch(with target values)

my_application.producer.terminate(processes)[source]

my_application.repository module

class my_application.repository.MongoRepository(mongo_host)[source]

Bases: object

client = None
create_collection(db_name, collection_name)[source]

Create collection inside database db_name. :param db_name: Name of the database :param collection_name: name of the collection :return:

create_database(db_name)[source]

Create a database in MongoDB :param db_name: Name of the database :return:

get_competition_data_records(competition_id)[source]

Fetch the datastream.

Parameters

competition_id – Competition ID

Returns

Datastream where each record(row) is a dictionary

get_competition_evaluation_measures(competition_id)[source]

Fetch the evaluation metrics for a given competition. :param competition_id: Competition ID :return: List of metrics

get_last_predictions_by_user(competition_id, now, field, measure, user_id, evaluation_time_interval)[source]

NOT USED!

Retrieve the latest predictions sent by user. :param competition_id: :param now: :param field: :param measure: :param user_id: :param evaluation_time_interval: :return: Dictionary with the computed evaluation metric for a given user

get_results_by_user(competition_id, field, measure, user_id)[source]

Fetch evaluation metric values for a given user and competition. :param competition_id: Competition ID :param field: Label column name :param measure: Evaluation metric :param user_id: User ID :return: Dictionary with the computed evaluation metric for a given user

get_standard_evaluation_measures()[source]

Retrieve the standard set of evaluation metrics offered on the platform. :return: List of measures (by name: MSE, MAPE, ACC…)

get_users_ranking_by_field_by_measure(competition_id, field, measure)[source]

Retrieve rankings of users for a specific label column and evaluation metric. :param competition_id: Competition ID :param field: Label column name :param measure: Evaluation metric :return: List of dictionary with the computed evaluation metric for all users

insert_document(db_name, collection_name, document)[source]

Insert a document in collection (collection_name) in database (db_name).

Parameters
  • db_name – Database name

  • collection_name – Collection name

  • document – json document

Returns

insert_standard_measures(standard_measures)[source]

Store standard set of evaluation metrics. :param standard_measures: list of metrics :return:

my_application.sparkEvaluator module

my_application.sparkEvaluator.evaluate(spark_context, kafka_broker, competition, competition_config, window_duration, prediction_window_duration, train_schema, prediction_schema, columns_to_sum, checkpoints, targets)[source]

The function for online evaluation of the incremental predicting models. It computes selected evaluation measures and stores them in the database.

Parameters
  • spark_context

  • kafka_broker – Address of kafka server

  • competition – Competition object

  • competition_config – Competition configuration dictionary

  • window_duration – Time (in seconds) to keep the test batches in the memory when performing a join

  • prediction_window_duration – Time (in seconds) to keep the predictions in the memory when performing a join

  • train_schema – Column names and types in the training batch

  • prediction_schema – Column names and types in the prediction batch

  • columns_to_sum – Columns to aggregate for evaluation

  • checkpoints – Checkpoint locations for Spark jobs

  • targets – target column names

Returns

Writes to MongoDB: instances of the stream, predictions by users and evaluation measures for each user

my_application.sparkToMongo module

class my_application.sparkToMongo.SparkToMongo(kafka_server, prediction_topic, golden_topic, measures_topic, competition, configuration)[source]

Bases: object

Consumer of Spark data class. This class implements Kafka consumer to receive data from Spark and store it in MongoDB. It handles messages from 3 topics: Evaluation metrics(aka. measures), predictions, and original data records (aka. golden).

process_golden(mess)[source]

Receive and store the original data records, write them to ‘golden_standard’ database in MongoDB. :param mess: message as json :return:

process_measures(mess, previous_batch, now)[source]

Process the messages on measures topic. Write to ‘evaluation_measures’ database. :param mess: current message :param previous_batch: previous message :param now: timestamp ‘now’, to follow the time interval between the messages. :return:

process_predictions(mess)[source]

Receive and store predictions in ‘predictions_v2’ database in MongoDB :param mess: message as json to write :return:

run()[source]

Main method, polls Kafka consumer for new messages.

Returns

my_application.stream_server module

class my_application.stream_server.StreamServer(server_port, options)[source]

Bases: object

StreamServer class. It handles the communication through gRPC/Protobuf.

add_server(streamer, competition)[source]

Define the communication protocol, e.g. import the methods and data structures that define protocol, from the files that are generated after compiling .proto file. :param streamer: :param competition: :return:

server = None
start_server()[source]

my_application.subscription_auth module

my_application.subscription_auth.decode_subscription_token(token)[source]

Decode token.

Parameters

token

Returns

my_application.subscription_auth.encode_subscription_token(competition_id, user_id)[source]

Encode token.

Parameters
  • competition_id

  • user_id

Returns

my_application.subscription_auth.get_subscription_token(competition_id, user_id)[source]

Retrieve token.

Parameters
  • competition_id

  • user_id

Returns

my_application.views module

my_application.views.allowed_file(filename)[source]

Check if the file has an allowed file extension.

Parameters

filename

Returns

True/False

my_application.views.authorized(*roles)[source]

Give authorization only to ADMIN user for some routes.

Parameters

roles

Returns

my_application.views.check_subscription()[source]

Check if user is subscribed to a given competition. :return: True or False

my_application.views.code_generator(competition_id)[source]
my_application.views.competitions()[source]

Competitions route. Implements two methods: GET: retrieve the competitions POST: create new competition: Parameters: - Competition name - Datastream - Description of the competition - Competition settings: size of initial batch, initial training time, size of a regular batch, time interval between the batches, name of the label column and the evaluation metric, start and end date, time interval to send the predictions, .proto file

Returns

Responses: {200: If method== “GET”, return the list of the competitions, If method == “POST”, confirm success, 500: Error}

my_application.views.confirm_email(token)[source]

Check if the confirmation token is correct.

Parameters

token

Returns

my_application.views.confirm_token(token, expiration=3600)[source]
my_application.views.delete_subscription()[source]

Removes the subscription for a competition :return: Response: {200: OK}

my_application.views.delete_users(user)[source]
my_application.views.download_data(competition_id)[source]

Download the .csv file with the records from the stream that have been published until now.

Parameters

competition_id – Competition ID

Returns

.csv file

my_application.views.download_proto_file(competition_id)[source]

Download the proto file for a given competition. :param competition_id: Competition ID :return: Responses: {.proto file, 404: File not found}

my_application.views.generate_confirmation_token(email)[source]
my_application.views.get_competition_evaluation_measure(competition_id)[source]

Get evaluation measures for a given competition.

Parameters

competition_id – Competition ID

Returns

Evaluation measure, or list if there are multiple

my_application.views.get_competition_golden_standard(competition_id)[source]

Retrieve the dataset with labels (training dataset). NOT USED.

Parameters

competition_id – Competition ID

Returns

my_application.views.get_competition_info(competition_id)[source]

Retrieve the competition by its id.

Parameters

competition_id – Competition id

Returns

The competition object

my_application.views.get_competition_stream(competition_id)[source]

Retrieve the stream. NOT USED.

Parameters

competition_id – Competition ID

Returns

my_application.views.get_competitions_by_user(user_id)[source]

Retrieve the list of the competitions to which user is subscribed.

Parameters

user_id – User ID

Returns

List the competitions.

my_application.views.get_datastream_info(datastream_id)[source]

Retrieve information about the dataset.

Parameters

datastream_id – Dataset id

Returns

Response: Return the information about the dataset (name, description)

my_application.views.get_datastreams()[source]

Datastreams route. It implements 2 methods: - GET: retrieve already existing datasets - POST: add new dataset Parameters: - Dataset name - Dataset description - Dataset file

Returns

Responses: {If method == “GET”: return the list of the datasets, If method == “POST”: {200: confirm success, 500: Error}

my_application.views.get_file_extension(filename)[source]

Get the extension of the file. :param filename: :return:

my_application.views.get_leaderboard_by_competition(competition_id)[source]

Leaderboard route. Retrieves the results, for a given competition, for all the users to show it on the leaderboard.

Parameters

competition_id – Competition ID

Returns

Ordered list with users and their results

my_application.views.get_messages(competition_id, field, measure, user_id)[source]

Stream live results to the chart.

Parameters
  • competition_id – Competition ID

  • field – Label column

  • measure – Evaluation metric

  • user_id – User ID

Returns

Batch metrics to be shown on the live chart.

my_application.views.get_secret_key()[source]

Retrieve the subscription token for a given user and competition.

Returns

Responses:{200: Confirm the token, 404: If user is not subscribed}

my_application.views.get_standard_evaluation_measures()[source]

Get all available evaluation measures. :return: List of the evaluation measures

my_application.views.get_users(user)[source]
my_application.views.index()[source]
my_application.views.login()[source]

Login route. Provide the credentials (email/password).

Returns

my_application.views.me(user)[source]
my_application.views.register()[source]

Registration route. Provides a form to register to the platform. Parameters: - First name, last name, email, password Then the confirmation e-mail is sent with a token to validate the registration.

Returns

Response: {200: confirm success}

my_application.views.results_by_user_by_competition(competition_id)[source]

Retrieve results of the users for a given competition.

Parameters

competition_id – Competition id

Returns

List of users with results

my_application.views.subscriptions()[source]

Subscriptions route. Registers the users’ subscriptions to competitions.

Returns

Responses: {200: Confirm success}

my_application.views.test()[source]

Module contents