API Reference¶
my_application package¶
Subpackages¶
Submodules¶
my_application.auth module¶
- 
my_application.auth.decode_auth_token(auth_token)[source]¶
- Decode authentication token. - Parameters
- auth_token – 
- Returns
- Responses: {100: Invalid or expired token, 200: Success} 
 
- 
my_application.auth.encode_auth_token(user)[source]¶
- Encode authentication token. - Parameters
- user – 
- Returns
- Responses: {401: Error, 200: Success} 
 
- 
my_application.auth.get_auth_token(user)[source]¶
- Get authentication token for a given user. - Parameters
- user – dictionary {‘id’: user email, ‘password’: user password} 
- Returns
- Responses: {404: User is not registered or password is incorrect, 401: User has not confirmed the email address, 500: Internal server error} 
- Returns
- Log in success if the password is correct and user has confirmed his/her address. 
 
my_application.consumer module¶
- 
class my_application.consumer.DataStreamerServicer(server, competition, competition_config)[source]¶
- Bases: - object- Datastream Servicer handles the communication with users. Sends the datastream records and handles the predictions that are sent by users. - It creates the topics. Starts Kafka producers and loads the data structures for communication with users, generated from .proto file. 
- 
class my_application.consumer.ProducerToMongoSink(kafka_server)[source]¶
- Bases: - object- Kafka producer. - Publishes messages to a given topic. - 
daemon= True¶
 - 
producer= None¶
 
- 
- 
my_application.consumer.receive_predictions(predictions, competition_id, user_id, end_date, kafka_producer, spark_topic, targets, stop)[source]¶
- This function receives the predictions from the users and publishes them to a Kafka topic so they can be read by Spark module. - Parameters
- predictions – Batch predictions sent by user 
- competition_id – Competition ID 
- user_id – User ID 
- end_date – Competition end date 
- kafka_producer – Kafka producer for a given user 
- spark_topic – Kafka topic to publish in, so it would be read by Spark 
- targets – label columns 
- stop – Kill signal for the thread 
 
- Returns
 
my_application.producer module¶
- 
class my_application.producer.CompetitionProducer(server)[source]¶
- Bases: - object- 
create_competition(competition, items, predictions, initial_batch)[source]¶
- Create a competition and start releasing the data stream. 
 - 
daemon= True¶
 - 
main(topic, initial_batch, items, predictions, initial_training_time, batch_size, time_interval, predictions_time_interval, spark_topic, competition_id)[source]¶
- Recreates the stream. Sends the data in batches: first test (without the target value) and then train batches. All batches are sent according to the time intervals set for the current competition. - Parameters
- topic – 
- initial_batch – 
- items – 
- predictions – 
- initial_training_time – 
- batch_size – 
- time_interval – 
- predictions_time_interval – 
- spark_topic – 
- competition_id – 
 
- Returns
 
 - 
producer= None¶
 
- 
- 
class my_application.producer.Scheduler[source]¶
- Bases: - object- Job Scheduler. - Used to store new competitions in MongoDBJobStore and initiate the beginning of the competition based on its start date. 
- 
my_application.producer.read_csv_file(competition, competition_config, data_format='csv')[source]¶
- Reads the data from a .csv file for a given competition. - Parameters
- competition – Competition object 
- competition_config – Cometition configuration 
- data_format – Format of the data. For now, the only format supported is ‘csv’ 
 
- Returns
- It returns the read data in batches(without the target value), the target values and the initial batch(with target values) 
 
my_application.repository module¶
- 
class my_application.repository.MongoRepository(mongo_host)[source]¶
- Bases: - object- 
client= None¶
 - 
create_collection(db_name, collection_name)[source]¶
- Create collection inside database db_name. :param db_name: Name of the database :param collection_name: name of the collection :return: 
 - 
create_database(db_name)[source]¶
- Create a database in MongoDB :param db_name: Name of the database :return: 
 - 
get_competition_data_records(competition_id)[source]¶
- Fetch the datastream. - Parameters
- competition_id – Competition ID 
- Returns
- Datastream where each record(row) is a dictionary 
 
 - 
get_competition_evaluation_measures(competition_id)[source]¶
- Fetch the evaluation metrics for a given competition. :param competition_id: Competition ID :return: List of metrics 
 - 
get_last_predictions_by_user(competition_id, now, field, measure, user_id, evaluation_time_interval)[source]¶
- NOT USED! - Retrieve the latest predictions sent by user. :param competition_id: :param now: :param field: :param measure: :param user_id: :param evaluation_time_interval: :return: Dictionary with the computed evaluation metric for a given user 
 - 
get_results_by_user(competition_id, field, measure, user_id)[source]¶
- Fetch evaluation metric values for a given user and competition. :param competition_id: Competition ID :param field: Label column name :param measure: Evaluation metric :param user_id: User ID :return: Dictionary with the computed evaluation metric for a given user 
 - 
get_standard_evaluation_measures()[source]¶
- Retrieve the standard set of evaluation metrics offered on the platform. :return: List of measures (by name: MSE, MAPE, ACC…) 
 - 
get_users_ranking_by_field_by_measure(competition_id, field, measure)[source]¶
- Retrieve rankings of users for a specific label column and evaluation metric. :param competition_id: Competition ID :param field: Label column name :param measure: Evaluation metric :return: List of dictionary with the computed evaluation metric for all users 
 
- 
my_application.sparkEvaluator module¶
- 
my_application.sparkEvaluator.evaluate(spark_context, kafka_broker, competition, competition_config, window_duration, prediction_window_duration, train_schema, prediction_schema, columns_to_sum, checkpoints, targets)[source]¶
- The function for online evaluation of the incremental predicting models. It computes selected evaluation measures and stores them in the database. - Parameters
- spark_context – 
- kafka_broker – Address of kafka server 
- competition – Competition object 
- competition_config – Competition configuration dictionary 
- window_duration – Time (in seconds) to keep the test batches in the memory when performing a join 
- prediction_window_duration – Time (in seconds) to keep the predictions in the memory when performing a join 
- train_schema – Column names and types in the training batch 
- prediction_schema – Column names and types in the prediction batch 
- columns_to_sum – Columns to aggregate for evaluation 
- checkpoints – Checkpoint locations for Spark jobs 
- targets – target column names 
 
- Returns
- Writes to MongoDB: instances of the stream, predictions by users and evaluation measures for each user 
 
my_application.sparkToMongo module¶
- 
class my_application.sparkToMongo.SparkToMongo(kafka_server, prediction_topic, golden_topic, measures_topic, competition, configuration)[source]¶
- Bases: - object- Consumer of Spark data class. This class implements Kafka consumer to receive data from Spark and store it in MongoDB. It handles messages from 3 topics: Evaluation metrics(aka. measures), predictions, and original data records (aka. golden). - 
process_golden(mess)[source]¶
- Receive and store the original data records, write them to ‘golden_standard’ database in MongoDB. :param mess: message as json :return: 
 - 
process_measures(mess, previous_batch, now)[source]¶
- Process the messages on measures topic. Write to ‘evaluation_measures’ database. :param mess: current message :param previous_batch: previous message :param now: timestamp ‘now’, to follow the time interval between the messages. :return: 
 
- 
my_application.stream_server module¶
- 
class my_application.stream_server.StreamServer(server_port, options)[source]¶
- Bases: - object- StreamServer class. It handles the communication through gRPC/Protobuf. - 
add_server(streamer, competition)[source]¶
- Define the communication protocol, e.g. import the methods and data structures that define protocol, from the files that are generated after compiling .proto file. :param streamer: :param competition: :return: 
 - 
server= None¶
 
- 
my_application.subscription_auth module¶
- 
my_application.subscription_auth.decode_subscription_token(token)[source]¶
- Decode token. - Parameters
- token – 
- Returns
 
my_application.views module¶
- 
my_application.views.allowed_file(filename)[source]¶
- Check if the file has an allowed file extension. - Parameters
- filename – 
- Returns
- True/False 
 
- Give authorization only to ADMIN user for some routes. - Parameters
- roles – 
- Returns
 
- 
my_application.views.check_subscription()[source]¶
- Check if user is subscribed to a given competition. :return: True or False 
- 
my_application.views.competitions()[source]¶
- Competitions route. Implements two methods: GET: retrieve the competitions POST: create new competition: Parameters: - Competition name - Datastream - Description of the competition - Competition settings: size of initial batch, initial training time, size of a regular batch, time interval between the batches, name of the label column and the evaluation metric, start and end date, time interval to send the predictions, .proto file - Returns
- Responses: {200: If method== “GET”, return the list of the competitions, If method == “POST”, confirm success, 500: Error} 
 
- 
my_application.views.confirm_email(token)[source]¶
- Check if the confirmation token is correct. - Parameters
- token – 
- Returns
 
- 
my_application.views.delete_subscription()[source]¶
- Removes the subscription for a competition :return: Response: {200: OK} 
- 
my_application.views.download_data(competition_id)[source]¶
- Download the .csv file with the records from the stream that have been published until now. - Parameters
- competition_id – Competition ID 
- Returns
- .csv file 
 
- 
my_application.views.download_proto_file(competition_id)[source]¶
- Download the proto file for a given competition. :param competition_id: Competition ID :return: Responses: {.proto file, 404: File not found} 
- 
my_application.views.get_competition_evaluation_measure(competition_id)[source]¶
- Get evaluation measures for a given competition. - Parameters
- competition_id – Competition ID 
- Returns
- Evaluation measure, or list if there are multiple 
 
- 
my_application.views.get_competition_golden_standard(competition_id)[source]¶
- Retrieve the dataset with labels (training dataset). NOT USED. - Parameters
- competition_id – Competition ID 
- Returns
 
- 
my_application.views.get_competition_info(competition_id)[source]¶
- Retrieve the competition by its id. - Parameters
- competition_id – Competition id 
- Returns
- The competition object 
 
- 
my_application.views.get_competition_stream(competition_id)[source]¶
- Retrieve the stream. NOT USED. - Parameters
- competition_id – Competition ID 
- Returns
 
- 
my_application.views.get_competitions_by_user(user_id)[source]¶
- Retrieve the list of the competitions to which user is subscribed. - Parameters
- user_id – User ID 
- Returns
- List the competitions. 
 
- 
my_application.views.get_datastream_info(datastream_id)[source]¶
- Retrieve information about the dataset. - Parameters
- datastream_id – Dataset id 
- Returns
- Response: Return the information about the dataset (name, description) 
 
- 
my_application.views.get_datastreams()[source]¶
- Datastreams route. It implements 2 methods: - GET: retrieve already existing datasets - POST: add new dataset Parameters: - Dataset name - Dataset description - Dataset file - Returns
- Responses: {If method == “GET”: return the list of the datasets, If method == “POST”: {200: confirm success, 500: Error} 
 
- 
my_application.views.get_file_extension(filename)[source]¶
- Get the extension of the file. :param filename: :return: 
- 
my_application.views.get_leaderboard_by_competition(competition_id)[source]¶
- Leaderboard route. Retrieves the results, for a given competition, for all the users to show it on the leaderboard. - Parameters
- competition_id – Competition ID 
- Returns
- Ordered list with users and their results 
 
- 
my_application.views.get_messages(competition_id, field, measure, user_id)[source]¶
- Stream live results to the chart. - Parameters
- competition_id – Competition ID 
- field – Label column 
- measure – Evaluation metric 
- user_id – User ID 
 
- Returns
- Batch metrics to be shown on the live chart. 
 
- 
my_application.views.get_secret_key()[source]¶
- Retrieve the subscription token for a given user and competition. - Returns
- Responses:{200: Confirm the token, 404: If user is not subscribed} 
 
- 
my_application.views.get_standard_evaluation_measures()[source]¶
- Get all available evaluation measures. :return: List of the evaluation measures 
- 
my_application.views.login()[source]¶
- Login route. Provide the credentials (email/password). - Returns
 
- 
my_application.views.register()[source]¶
- Registration route. Provides a form to register to the platform. Parameters: - First name, last name, email, password Then the confirmation e-mail is sent with a token to validate the registration. - Returns
- Response: {200: confirm success} 
 
- 
my_application.views.results_by_user_by_competition(competition_id)[source]¶
- Retrieve results of the users for a given competition. - Parameters
- competition_id – Competition id 
- Returns
- List of users with results