Source code for my_application.stream_server


# Copyright 2020 Nedeljko Radulovic, Dihia Boulegane, Albert Bifet
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


import time
import grpc
from concurrent import futures
import json
import os
from importlib.machinery import SourceFileLoader

_ONE_DAY_IN_SECONDS = 60 * 60 * 24

with open('/home/nedeljko/PycharmProjects/Competition_to_publish/SCALAR/provider/my_application/config.json') as json_data_file:
    config = json.load(json_data_file)


_UPLOAD_REPO = config['UPLOAD_REPO']
_COMPETITION_GENERATED_CODE = config['COMPETITION_GENERATED_CODE']


[docs]class StreamServer: """ StreamServer class. It handles the communication through gRPC/Protobuf. """ server = None def __init__(self, server_port, options): """ Start gRPC server to be able to communicate with multiple users. :param server_port: :param options: """ if self.server is None: self.server = grpc.server(futures.ThreadPoolExecutor(max_workers=100), options=options) self.port = server_port
[docs] def add_server(self, streamer, competition): """ Define the communication protocol, e.g. import the methods and data structures that define protocol, from the files that are generated after compiling .proto file. :param streamer: :param competition: :return: """ pb2_grpc_file_path = os.path.join(_UPLOAD_REPO, _COMPETITION_GENERATED_CODE, competition.name, 'file_pb2_grpc.py') file_pb2_grpc = SourceFileLoader('file_pb2_grpc', pb2_grpc_file_path).load_module() file_pb2_grpc.add_DataStreamerServicer_to_server(streamer, self.server)
[docs] def start_server(self): self.server.add_insecure_port(self.port) self.server.start()
def _wait_forever(self): try: while True: time.sleep(_ONE_DAY_IN_SECONDS) except KeyboardInterrupt: self.server.stop(None)
if __name__ == "__main__": try: while True: time.sleep(_ONE_DAY_IN_SECONDS) except KeyboardInterrupt: server.stop(0) # ???