API Reference¶
my_application package¶
Subpackages¶
Submodules¶
my_application.auth module¶
-
my_application.auth.
decode_auth_token
(auth_token)[source]¶ Decode authentication token.
- Parameters
auth_token –
- Returns
Responses: {100: Invalid or expired token, 200: Success}
-
my_application.auth.
encode_auth_token
(user)[source]¶ Encode authentication token.
- Parameters
user –
- Returns
Responses: {401: Error, 200: Success}
-
my_application.auth.
get_auth_token
(user)[source]¶ Get authentication token for a given user.
- Parameters
user – dictionary {‘id’: user email, ‘password’: user password}
- Returns
Responses: {404: User is not registered or password is incorrect, 401: User has not confirmed the email address, 500: Internal server error}
- Returns
Log in success if the password is correct and user has confirmed his/her address.
my_application.consumer module¶
-
class
my_application.consumer.
DataStreamerServicer
(server, competition, competition_config)[source]¶ Bases:
object
Datastream Servicer handles the communication with users. Sends the datastream records and handles the predictions that are sent by users.
It creates the topics. Starts Kafka producers and loads the data structures for communication with users, generated from .proto file.
-
class
my_application.consumer.
ProducerToMongoSink
(kafka_server)[source]¶ Bases:
object
Kafka producer.
Publishes messages to a given topic.
-
daemon
= True¶
-
producer
= None¶
-
-
my_application.consumer.
receive_predictions
(predictions, competition_id, user_id, end_date, kafka_producer, spark_topic, targets, stop)[source]¶ This function receives the predictions from the users and publishes them to a Kafka topic so they can be read by Spark module.
- Parameters
predictions – Batch predictions sent by user
competition_id – Competition ID
user_id – User ID
end_date – Competition end date
kafka_producer – Kafka producer for a given user
spark_topic – Kafka topic to publish in, so it would be read by Spark
targets – label columns
stop – Kill signal for the thread
- Returns
my_application.producer module¶
-
class
my_application.producer.
CompetitionProducer
(server)[source]¶ Bases:
object
-
create_competition
(competition, items, predictions, initial_batch)[source]¶ Create a competition and start releasing the data stream.
-
daemon
= True¶
-
main
(topic, initial_batch, items, predictions, initial_training_time, batch_size, time_interval, predictions_time_interval, spark_topic, competition_id)[source]¶ Recreates the stream. Sends the data in batches: first test (without the target value) and then train batches. All batches are sent according to the time intervals set for the current competition.
- Parameters
topic –
initial_batch –
items –
predictions –
initial_training_time –
batch_size –
time_interval –
predictions_time_interval –
spark_topic –
competition_id –
- Returns
-
producer
= None¶
-
-
class
my_application.producer.
Scheduler
[source]¶ Bases:
object
Job Scheduler.
Used to store new competitions in MongoDBJobStore and initiate the beginning of the competition based on its start date.
-
my_application.producer.
read_csv_file
(competition, competition_config, data_format='csv')[source]¶ Reads the data from a .csv file for a given competition.
- Parameters
competition – Competition object
competition_config – Cometition configuration
data_format – Format of the data. For now, the only format supported is ‘csv’
- Returns
It returns the read data in batches(without the target value), the target values and the initial batch(with target values)
my_application.repository module¶
-
class
my_application.repository.
MongoRepository
(mongo_host)[source]¶ Bases:
object
-
client
= None¶
-
create_collection
(db_name, collection_name)[source]¶ Create collection inside database db_name. :param db_name: Name of the database :param collection_name: name of the collection :return:
-
create_database
(db_name)[source]¶ Create a database in MongoDB :param db_name: Name of the database :return:
-
get_competition_data_records
(competition_id)[source]¶ Fetch the datastream.
- Parameters
competition_id – Competition ID
- Returns
Datastream where each record(row) is a dictionary
-
get_competition_evaluation_measures
(competition_id)[source]¶ Fetch the evaluation metrics for a given competition. :param competition_id: Competition ID :return: List of metrics
-
get_last_predictions_by_user
(competition_id, now, field, measure, user_id, evaluation_time_interval)[source]¶ NOT USED!
Retrieve the latest predictions sent by user. :param competition_id: :param now: :param field: :param measure: :param user_id: :param evaluation_time_interval: :return: Dictionary with the computed evaluation metric for a given user
-
get_results_by_user
(competition_id, field, measure, user_id)[source]¶ Fetch evaluation metric values for a given user and competition. :param competition_id: Competition ID :param field: Label column name :param measure: Evaluation metric :param user_id: User ID :return: Dictionary with the computed evaluation metric for a given user
-
get_standard_evaluation_measures
()[source]¶ Retrieve the standard set of evaluation metrics offered on the platform. :return: List of measures (by name: MSE, MAPE, ACC…)
-
get_users_ranking_by_field_by_measure
(competition_id, field, measure)[source]¶ Retrieve rankings of users for a specific label column and evaluation metric. :param competition_id: Competition ID :param field: Label column name :param measure: Evaluation metric :return: List of dictionary with the computed evaluation metric for all users
-
my_application.sparkEvaluator module¶
-
my_application.sparkEvaluator.
evaluate
(spark_context, kafka_broker, competition, competition_config, window_duration, prediction_window_duration, train_schema, prediction_schema, columns_to_sum, checkpoints, targets)[source]¶ The function for online evaluation of the incremental predicting models. It computes selected evaluation measures and stores them in the database.
- Parameters
spark_context –
kafka_broker – Address of kafka server
competition – Competition object
competition_config – Competition configuration dictionary
window_duration – Time (in seconds) to keep the test batches in the memory when performing a join
prediction_window_duration – Time (in seconds) to keep the predictions in the memory when performing a join
train_schema – Column names and types in the training batch
prediction_schema – Column names and types in the prediction batch
columns_to_sum – Columns to aggregate for evaluation
checkpoints – Checkpoint locations for Spark jobs
targets – target column names
- Returns
Writes to MongoDB: instances of the stream, predictions by users and evaluation measures for each user
my_application.sparkToMongo module¶
-
class
my_application.sparkToMongo.
SparkToMongo
(kafka_server, prediction_topic, golden_topic, measures_topic, competition, configuration)[source]¶ Bases:
object
Consumer of Spark data class. This class implements Kafka consumer to receive data from Spark and store it in MongoDB. It handles messages from 3 topics: Evaluation metrics(aka. measures), predictions, and original data records (aka. golden).
-
process_golden
(mess)[source]¶ Receive and store the original data records, write them to ‘golden_standard’ database in MongoDB. :param mess: message as json :return:
-
process_measures
(mess, previous_batch, now)[source]¶ Process the messages on measures topic. Write to ‘evaluation_measures’ database. :param mess: current message :param previous_batch: previous message :param now: timestamp ‘now’, to follow the time interval between the messages. :return:
-
my_application.stream_server module¶
-
class
my_application.stream_server.
StreamServer
(server_port, options)[source]¶ Bases:
object
StreamServer class. It handles the communication through gRPC/Protobuf.
-
add_server
(streamer, competition)[source]¶ Define the communication protocol, e.g. import the methods and data structures that define protocol, from the files that are generated after compiling .proto file. :param streamer: :param competition: :return:
-
server
= None¶
-
my_application.subscription_auth module¶
-
my_application.subscription_auth.
decode_subscription_token
(token)[source]¶ Decode token.
- Parameters
token –
- Returns
my_application.views module¶
-
my_application.views.
allowed_file
(filename)[source]¶ Check if the file has an allowed file extension.
- Parameters
filename –
- Returns
True/False
Give authorization only to ADMIN user for some routes.
- Parameters
roles –
- Returns
-
my_application.views.
check_subscription
()[source]¶ Check if user is subscribed to a given competition. :return: True or False
-
my_application.views.
competitions
()[source]¶ Competitions route. Implements two methods: GET: retrieve the competitions POST: create new competition: Parameters: - Competition name - Datastream - Description of the competition - Competition settings: size of initial batch, initial training time, size of a regular batch, time interval between the batches, name of the label column and the evaluation metric, start and end date, time interval to send the predictions, .proto file
- Returns
Responses: {200: If method== “GET”, return the list of the competitions, If method == “POST”, confirm success, 500: Error}
-
my_application.views.
confirm_email
(token)[source]¶ Check if the confirmation token is correct.
- Parameters
token –
- Returns
-
my_application.views.
delete_subscription
()[source]¶ Removes the subscription for a competition :return: Response: {200: OK}
-
my_application.views.
download_data
(competition_id)[source]¶ Download the .csv file with the records from the stream that have been published until now.
- Parameters
competition_id – Competition ID
- Returns
.csv file
-
my_application.views.
download_proto_file
(competition_id)[source]¶ Download the proto file for a given competition. :param competition_id: Competition ID :return: Responses: {.proto file, 404: File not found}
-
my_application.views.
get_competition_evaluation_measure
(competition_id)[source]¶ Get evaluation measures for a given competition.
- Parameters
competition_id – Competition ID
- Returns
Evaluation measure, or list if there are multiple
-
my_application.views.
get_competition_golden_standard
(competition_id)[source]¶ Retrieve the dataset with labels (training dataset). NOT USED.
- Parameters
competition_id – Competition ID
- Returns
-
my_application.views.
get_competition_info
(competition_id)[source]¶ Retrieve the competition by its id.
- Parameters
competition_id – Competition id
- Returns
The competition object
-
my_application.views.
get_competition_stream
(competition_id)[source]¶ Retrieve the stream. NOT USED.
- Parameters
competition_id – Competition ID
- Returns
-
my_application.views.
get_competitions_by_user
(user_id)[source]¶ Retrieve the list of the competitions to which user is subscribed.
- Parameters
user_id – User ID
- Returns
List the competitions.
-
my_application.views.
get_datastream_info
(datastream_id)[source]¶ Retrieve information about the dataset.
- Parameters
datastream_id – Dataset id
- Returns
Response: Return the information about the dataset (name, description)
-
my_application.views.
get_datastreams
()[source]¶ Datastreams route. It implements 2 methods: - GET: retrieve already existing datasets - POST: add new dataset Parameters: - Dataset name - Dataset description - Dataset file
- Returns
Responses: {If method == “GET”: return the list of the datasets, If method == “POST”: {200: confirm success, 500: Error}
-
my_application.views.
get_file_extension
(filename)[source]¶ Get the extension of the file. :param filename: :return:
-
my_application.views.
get_leaderboard_by_competition
(competition_id)[source]¶ Leaderboard route. Retrieves the results, for a given competition, for all the users to show it on the leaderboard.
- Parameters
competition_id – Competition ID
- Returns
Ordered list with users and their results
-
my_application.views.
get_messages
(competition_id, field, measure, user_id)[source]¶ Stream live results to the chart.
- Parameters
competition_id – Competition ID
field – Label column
measure – Evaluation metric
user_id – User ID
- Returns
Batch metrics to be shown on the live chart.
-
my_application.views.
get_secret_key
()[source]¶ Retrieve the subscription token for a given user and competition.
- Returns
Responses:{200: Confirm the token, 404: If user is not subscribed}
-
my_application.views.
get_standard_evaluation_measures
()[source]¶ Get all available evaluation measures. :return: List of the evaluation measures
-
my_application.views.
login
()[source]¶ Login route. Provide the credentials (email/password).
- Returns
-
my_application.views.
register
()[source]¶ Registration route. Provides a form to register to the platform. Parameters: - First name, last name, email, password Then the confirmation e-mail is sent with a token to validate the registration.
- Returns
Response: {200: confirm success}
-
my_application.views.
results_by_user_by_competition
(competition_id)[source]¶ Retrieve results of the users for a given competition.
- Parameters
competition_id – Competition id
- Returns
List of users with results