import os
from flask import Flask, jsonify, request, Response
from pipert.core.pipeline_manager import PipelineManager
from pipert.utils.useful_methods import open_config_file
import inspect
import zerorpc
from gevent.pywsgi import WSGIServer
[docs]class CliConnection(object):
def __init__(self, pipeline_manager):
self.pipeline_manager = pipeline_manager
components = open_config_file(os.environ.get("CONFIG_PATH", ""))
if not isinstance(components, str):
self.pipeline_manager.setup_components(components)
[docs] def execute_method(self, method_name, parameters_values):
return getattr(self.pipeline_manager, method_name)(**parameters_values)
[docs] def get_methods(self):
methods = inspect.getmembers(self.pipeline_manager, predicate=inspect.ismethod)
methods = [method[0] for method in methods if (not method[0].startswith('_'))]
return methods
# need to be tested
[docs] def get_method_parameters(self, method_name):
return list(inspect.signature(getattr(self.pipeline_manager, method_name)).parameters.keys())
[docs] def check_connection(self):
return True
[docs]pipeline_manager = PipelineManager()
if not os.environ.get("UI", "").lower() == 'true':
[docs] cli_server = zerorpc.Server(CliConnection(pipeline_manager))
cli_server.bind("tcp://0.0.0.0:" + os.environ.get("CLI_ENDPOINT", "4001"))
cli_server.run()
else:
app = Flask(__name__)
def return_response(res_object):
return Response(res_object["Message"], 200 if res_object["Succeeded"] else 400)
@app.route("/routines", methods=['GET'])
def get_routines():
return jsonify(pipeline_manager.get_all_routine_types())
@app.route("/routineParams/<routine_name>", methods=['GET'])
def get_routine_params(routine_name):
return jsonify(pipeline_manager.get_routine_parameters(routine_name))
@app.route("/component", methods=['GET'])
def get_component():
return "TBD"
@app.route("/pipeline", methods=['POST', 'GET'])
def create_pipeline():
if request.method == 'GET':
return jsonify(
[
{
"name": "name of component",
"queues": "[array of names of queues]",
"routines":
[
{
"routine_type_name": "name of the routine type",
"routine_param_name": "the values of the routine parameters that can be fount in"
"/routineParams/routineTypeName"
}
]
}
]
)
elif request.method == 'POST':
return return_response(pipeline_manager.setup_components(request.json))
@app.route("/kill", methods=['PUT'])
def stop_components():
return return_response(pipeline_manager.stop_all_components())
@app.route("/run", methods=['PUT'])
def start_components():
return_response(pipeline_manager.run_all_components())
port = int(os.environ.get("API_PORT", 5005))
if os.environ.get("DEBUG", "False").lower() == "true":
print("Running on Debug")
app.debug = True
else:
print("Running on Prod")
app.debug = False
WSGIServer(('0.0.0.0', port), app).serve_forever()