Source code for main

import os
from flask import Flask, jsonify, request, Response
from pipert.core.pipeline_manager import PipelineManager
from pipert.utils.useful_methods import open_config_file
import inspect
import zerorpc
from gevent.pywsgi import WSGIServer


[docs]class CliConnection(object): def __init__(self, pipeline_manager): self.pipeline_manager = pipeline_manager components = open_config_file(os.environ.get("CONFIG_PATH", "")) if not isinstance(components, str): self.pipeline_manager.setup_components(components)
[docs] def execute_method(self, method_name, parameters_values): return getattr(self.pipeline_manager, method_name)(**parameters_values)
[docs] def get_methods(self): methods = inspect.getmembers(self.pipeline_manager, predicate=inspect.ismethod) methods = [method[0] for method in methods if (not method[0].startswith('_'))] return methods
# need to be tested
[docs] def get_method_parameters(self, method_name): return list(inspect.signature(getattr(self.pipeline_manager, method_name)).parameters.keys())
[docs] def check_connection(self): return True
[docs]pipeline_manager = PipelineManager()
if not os.environ.get("UI", "").lower() == 'true':
[docs] cli_server = zerorpc.Server(CliConnection(pipeline_manager))
cli_server.bind("tcp://0.0.0.0:" + os.environ.get("CLI_ENDPOINT", "4001")) cli_server.run() else: app = Flask(__name__) def return_response(res_object): return Response(res_object["Message"], 200 if res_object["Succeeded"] else 400) @app.route("/routines", methods=['GET']) def get_routines(): return jsonify(pipeline_manager.get_all_routine_types()) @app.route("/routineParams/<routine_name>", methods=['GET']) def get_routine_params(routine_name): return jsonify(pipeline_manager.get_routine_parameters(routine_name)) @app.route("/component", methods=['GET']) def get_component(): return "TBD" @app.route("/pipeline", methods=['POST', 'GET']) def create_pipeline(): if request.method == 'GET': return jsonify( [ { "name": "name of component", "queues": "[array of names of queues]", "routines": [ { "routine_type_name": "name of the routine type", "routine_param_name": "the values of the routine parameters that can be fount in" "/routineParams/routineTypeName" } ] } ] ) elif request.method == 'POST': return return_response(pipeline_manager.setup_components(request.json)) @app.route("/kill", methods=['PUT']) def stop_components(): return return_response(pipeline_manager.stop_all_components()) @app.route("/run", methods=['PUT']) def start_components(): return_response(pipeline_manager.run_all_components()) port = int(os.environ.get("API_PORT", 5005)) if os.environ.get("DEBUG", "False").lower() == "true": print("Running on Debug") app.debug = True else: print("Running on Prod") app.debug = False WSGIServer(('0.0.0.0', port), app).serve_forever()