diff --git a/config.py b/config.py index 6ce3269..93c452f 100644 --- a/config.py +++ b/config.py @@ -41,6 +41,26 @@ class Config(object): SUPPORTED_EMBEDDINGS = ['openai.text-embedding-3-small', 'mistral.mistral-embed'] SUPPORTED_LLMS = ['openai.gpt-4-turbo', 'openai.gpt-3.5-turbo', 'mistral.mistral-large-2402'] + # Celery settings + CELERY_TASK_SERIALIZER = 'json' + CELERY_ACCEPT_CONTENT = ['json'] + CELERY_TIMEZONE = 'UTC' + CELERY_ENABLE_UTC = True + + # Define multiple queues + CELERY_TASK_QUEUES = { + 'embeddings': { + 'exchange': 'embeddings', + 'routing_key': 'embeddings.key', + 'queue_arguments': {'x-max-priority': 10} + }, + 'llm_interactions': { + 'exchange': 'llm_interactions', + 'routing_key': 'llm_interactions.key', + 'queue_arguments': {'x-max-priority': 5} + } + } + class DevConfig(Config): DEVELOPMENT = True @@ -61,6 +81,14 @@ class DevConfig(Config): CELERY_BROKER_URL = 'redis://localhost:6379/0' # Default Redis configuration CELERY_RESULT_BACKEND = 'redis://localhost:6379/0' + # OpenAI API Keys + OPENAI_API_KEY = 'sk-proj-8R0jWzwjL7PeoPyMhJTZT3BlbkFJLb6HfRB2Hr9cEVFWEhU7' + + # Unstructured settings + UNSTRUCTURED_API_KEY = 'pDgCrXumYhM3CNvjvwV8msMldXC3uw' + UNSTRUCTURED_BASE_URL = 'https://flowitbv-16c4us0m.api.unstructuredapp.io' + UNSTRUCTURED_FULL_URL = 'https://flowitbv-16c4us0m.api.unstructuredapp.io/general/v0/general' + class ProdConfig(Config): DEVELOPMENT = False diff --git a/eveai_app/__init__.py b/eveai_app/__init__.py index ba47248..867b0c1 100644 --- a/eveai_app/__init__.py +++ b/eveai_app/__init__.py @@ -5,12 +5,14 @@ from flask_security import SQLAlchemyUserDatastore from flask_security.signals import user_authenticated from werkzeug.middleware.proxy_fix import ProxyFix import logging.config +from celery import Celery from .extensions import db, migrate, bootstrap, security, mail, login_manager, cors from .models.user import User, Tenant, Role from .models.document import Document, DocumentLanguage, DocumentVersion from .logging_config import LOGGING from .utils.security import set_tenant_session_data +from .worker.celery_utils import init_celery def create_app(config_file=None): @@ -43,6 +45,9 @@ def create_app(config_file=None): mail_logger.setLevel(logging.DEBUG) security_logger = logging.getLogger('flask_security') security_logger.setLevel(logging.DEBUG) + sqlalchemy_logger = logging.getLogger('sqlalchemy.engine') + sqlalchemy_logger.setLevel(logging.DEBUG) + # sqlalchemy_logger.addHandler(logging.StreamHandler()) # Register API register_api(app) @@ -73,3 +78,18 @@ def register_api(app): pass # from . import api # app.register_blueprint(api.bp, url_prefix='/api') + + +def create_celery_app(config_file=None): + app = Flask(__name__) + if config_file is None: + app.config.from_object('config.DevConfig') + else: + app.config.from_object(config_file) + + celery = Celery(app.import_name) + init_celery(celery, app) + return celery + + +celery = create_celery_app() diff --git a/eveai_app/controllers/document_controller.py b/eveai_app/controllers/document_controller.py deleted file mode 100644 index d18f24a..0000000 --- a/eveai_app/controllers/document_controller.py +++ /dev/null @@ -1,3 +0,0 @@ -from ..models.document import Document, DocumentLanguage, DocumentVersion - -def process_document(file, name, language, valid_from): diff --git a/eveai_app/models/document.py b/eveai_app/models/document.py index eb06a0b..4aeed19 100644 --- a/eveai_app/models/document.py +++ b/eveai_app/models/document.py @@ -27,6 +27,7 @@ class DocumentLanguage(db.Model): id = db.Column(db.Integer, primary_key=True) document_id = db.Column(db.Integer, db.ForeignKey(Document.id), nullable=False) language = db.Column(db.String(2), nullable=False) + latest_version_id = db.Column(db.Integer, db.ForeignKey('document_version.id'), nullable=True) # Versioning Information created_at = db.Column(db.DateTime, nullable=False, server_default=db.func.now()) @@ -35,7 +36,17 @@ class DocumentLanguage(db.Model): updated_by = db.Column(db.Integer, db.ForeignKey(User.id)) # Relations - versions = db.relationship('DocumentVersion', backref='document_language', lazy=True) + versions = db.relationship( + 'DocumentVersion', + backref='document_language', + lazy='joined', + foreign_keys='DocumentVersion.doc_lang_id' + ) + latest_version = db.relationship( + 'DocumentVersion', + uselist=False, + foreign_keys=[latest_version_id] + ) def __repr__(self): return f"" diff --git a/eveai_app/templates/document/documents.html b/eveai_app/templates/document/documents.html new file mode 100644 index 0000000..94a239b --- /dev/null +++ b/eveai_app/templates/document/documents.html @@ -0,0 +1,18 @@ +{% extends 'base.html' %} +{% from 'macros.html' import render_nested_table, render_pagination %} + +{% block title %}Documents{% endblock %} + +{% block content_title %}Documents{% endblock %} +{% block content_description %}View Documents for Tenant{% endblock %} + +{% block content %} +
+ + {{ render_nested_table(headers=["Name", "Created At", "Valid From", "Languages & Versions"], rows=rows) }} +
+{% endblock %} + +{% block content_footer %} + {{ render_pagination(pagination, 'document_bp.documents') }} +{% endblock %} \ No newline at end of file diff --git a/eveai_app/templates/macros.html b/eveai_app/templates/macros.html index bfd73e5..cd6a7ea 100644 --- a/eveai_app/templates/macros.html +++ b/eveai_app/templates/macros.html @@ -22,3 +22,151 @@ {% endif %} {% endif %} {% endmacro %} + +{% macro render_table(headers, rows) %} +
+
+ + + + {% for header in headers %} + + {% endfor %} + + + + {% for row in rows %} + + {% for cell in row %} + + {% endfor %} + + {% endfor %} + +
{{ header }}
+ {% if cell.type == 'image' %} +
+
+ +
+
+ {% elif cell.type == 'text' %} +

{{ cell.value }}

+ {% elif cell.type == 'badge' %} + {{ cell.value }} + {% elif cell.type == 'link' %} + {{ cell.value }} + {% else %} + {{ cell.value }} + {% endif %} +
+
+
+{% endmacro %} + +{% macro render_accordion(accordion_id, accordion_items, header_title, header_description) %} +
+
+
+
+

{{ header_title }}

+

{{ header_description }}

+
+
+
+
+
+ {% for item in accordion_items %} +
+
+ +
+
+
+ {{ item.content }} +
+
+
+ {% endfor %} +
+
+
+
+
+{% endmacro %} + +{% macro render_nested_table(headers, rows) %} +
+
+ + + + {% for header in headers %} + + {% endfor %} + + + + {% for row in rows %} + + {% for cell in row %} + {% if cell.is_group %} + + {% else %} + + {% endif %} + {% endfor %} + + {% endfor %} + +
{{ header }}
+ {{ render_nested_table(cell.headers, cell.sub_rows) }} + + {% if cell.type == 'image' %} +
+
+ +
+
+ {% elif cell.type == 'text' %} +

{{ cell.value }}

+ {% elif cell.type == 'badge' %} + {{ cell.value }} + {% elif cell.type == 'link' %} + {{ cell.value }} + {% else %} + {{ cell.value }} + {% endif %} +
+
+
+{% endmacro %} + +{% macro render_pagination(pagination, endpoint) %} + +{% endmacro %} + diff --git a/eveai_app/utils/database.py b/eveai_app/utils/database.py index 081c09b..eb38ce8 100644 --- a/eveai_app/utils/database.py +++ b/eveai_app/utils/database.py @@ -4,7 +4,7 @@ from sqlalchemy import text from sqlalchemy.schema import CreateSchema from sqlalchemy.exc import InternalError from sqlalchemy.orm import sessionmaker, scoped_session -from flask_migrate import heads +from flask import current_app from ..extensions import db, migrate @@ -37,9 +37,10 @@ class Database: db.session.execute(CreateSchema(self.schema)) db.session.execute(text(f"CREATE EXTENSION IF NOT EXISTS pgvector SCHEMA {self.schema}")) db.session.commit() - except InternalError: + except InternalError as e: db.session.rollback() db.session.close() + current_app.logger.error(f"Error creating schema {self.schema}: {e.args}") def create_tables(self): """create tables in for schema""" diff --git a/eveai_app/views/basic_forms.py b/eveai_app/views/basic_forms.py index c364c28..3eb3599 100644 --- a/eveai_app/views/basic_forms.py +++ b/eveai_app/views/basic_forms.py @@ -16,7 +16,7 @@ class SessionDefaultsForm(FlaskForm): default_llm_model = SelectField('Default LLM Model', choices=[], validators=[DataRequired()]) def __init__(self): - super(SessionDefaultsForm, self).__init__() + super().__init__() self.user_name.data = current_user.user_name self.user_email.data = current_user.email self.tenant_name.data = session.get('tenant').get('name') diff --git a/eveai_app/views/document_forms.py b/eveai_app/views/document_forms.py index 7c36ee9..3102d5f 100644 --- a/eveai_app/views/document_forms.py +++ b/eveai_app/views/document_forms.py @@ -10,14 +10,20 @@ class AddDocumentForm(FlaskForm): file = FileField('File', validators=[FileAllowed(['pdf', 'txt']), FileRequired()]) name = StringField('Name', validators=[Length(max=100)]) - language = StringField('Language', validators=[Length(max=2)]) + language = SelectField('Language', choices=[], validators=[Optional()]) valid_from = DateField('Valid from', id='form-control datepicker', validators=[Optional()]) doc_embedding_model = SelectField('Default Embedding Model', choices=[], validators=[DataRequired()]) submit = SubmitField('Submit') def __init__(self): - super(AddDocumentForm, self).__init__() + super().__init__() + self.language.choices = [(language, language) for language in + session.get('tenant').get('allowed_languages')] + self.language.data = session.get('default_language') self.doc_embedding_model.choices = [(model, model) for model in session.get('tenant').get('allowed_embedding_models')] self.doc_embedding_model.data = session.get('default_embedding_model') + + + diff --git a/eveai_app/views/document_views.py b/eveai_app/views/document_views.py index a87f299..b2cce10 100644 --- a/eveai_app/views/document_views.py +++ b/eveai_app/views/document_views.py @@ -2,12 +2,13 @@ import os from datetime import datetime as dt, timezone as tz from flask import request, redirect, url_for, flash, render_template, Blueprint, session, current_app from flask_security import hash_password, roles_required, roles_accepted, current_user +from sqlalchemy import desc +from sqlalchemy.orm import joinedload from werkzeug.utils import secure_filename from ..models.document import Document, DocumentLanguage, DocumentVersion from ..extensions import db from .document_forms import AddDocumentForm -from ..utils.database import Database from ..utils.middleware import mw_before_request document_bp = Blueprint('document_bp', __name__, url_prefix='/document') @@ -37,74 +38,177 @@ def add_document(): else: new_doc.name = form.name.data - timestamp = dt.now(tz.utc) if form.valid_from.data or form.valid_from.data != '': new_doc.valid_from = form.valid_from.data else: - new_doc.valid_from = timestamp - + new_doc.valid_from = dt.now(tz.utc) new_doc.tenant_id = session['tenant']['id'] - - new_doc.created_at = timestamp - new_doc.updated_at = timestamp - new_doc.created_by = current_user.id - new_doc.updated_by = current_user.id + set_logging_information(new_doc, dt.now(tz.utc)) # Create the DocumentLanguage - new_doc_lang = DocumentLanguage() - if form.language.data == '': - new_doc_lang.language = session['default_language'] - else: - new_doc_lang.language = form.language.data - - new_doc_lang.document = new_doc - - new_doc_lang.created_at = timestamp - new_doc_lang.updated_at = timestamp - new_doc_lang.created_by = current_user.id - new_doc_lang.updated_by = current_user.id + new_doc_lang = create_language_for_document(new_doc, form.language.data) # Create the DocumentVersion new_doc_vers = DocumentVersion() new_doc_vers.document_language = new_doc_lang - - new_doc_vers.created_at = timestamp - new_doc_vers.updated_at = timestamp - new_doc_vers.created_by = current_user.id - new_doc_vers.updated_by = current_user.id + set_logging_information(new_doc_vers, dt.now(tz.utc)) try: db.session.add(new_doc) db.session.add(new_doc_lang) db.session.add(new_doc_vers) db.session.commit() + new_doc_lang.latest_version = new_doc_vers + db.session.commit() except Exception as e: db.session.rollback() error = e.args # Save the file and process the document if error is None: - flash('Document added successfully.') - new_doc_vers.file_type = extension - new_doc_vers.file_name = new_doc_vers.calc_file_name() - new_doc_vers.file_location = new_doc_vers.calc_file_location() - - upload_path = os.path.join(current_app.config['UPLOAD_FOLDER'], new_doc_vers.file_location) - if not os.path.exists(upload_path): - os.makedirs(upload_path, exist_ok=True) - file.save(os.path.join(upload_path, new_doc_vers.file_name)) - try: - db.session.commit() - except Exception as e: - db.session.rollback() - error = e.args - if error is None: - flash('Document saved successfully.') - # TODO: processing of document to embeddings (async) - flash('Document processing started.') - else: - flash('Error saving document.') + flash('Document added successfully.', 'success') + upload_file_for_version(new_doc_vers, file, extension) else: - flash('Error adding document.') + flash('Error adding document.', 'error') + current_app.logger.error(f'Error adding document for tenant {session["tenant"]["id"]}: {error}') - return render_template('document/add_document.html', form=form) + # return render_template('document/add_document.html', form=form) + + +@document_bp.route('/documents', methods=['GET', 'POST']) +@roles_accepted('Super User', 'Tenant Admin') +def documents(): + page = request.args.get('page', 1, type=int) + per_page = request.args.get('per_page', 10, type=int) + + query = Document.query.order_by(desc(Document.created_at)).options( + joinedload(Document.languages).joinedload(DocumentLanguage.versions)) + + pagination = query.paginate(page=page, per_page=per_page, error_out=False) + docs = pagination.items + + rows = prepare_document_data(docs) + + return render_template('document/documents.html', rows=rows, pagination=pagination) + + +@document_bp.route('/process_version/', methods=['POST']) +@roles_accepted('Super User', 'Tenant Admin') +def process_version(version_id): + version = DocumentVersion.query.get_or_404(version_id) + if not version.processing: + print(f'Placeholder for processing version: {version_id}') + + return redirect(url_for('documents')) + + +def set_logging_information(obj, timestamp): + obj.created_at = timestamp + obj.updated_at = timestamp + obj.created_by = current_user.id + obj.updated_by = current_user.id + + +def create_language_for_document(document, language): + new_doc_lang = DocumentLanguage() + if language == '': + new_doc_lang.language = session['default_language'] + else: + new_doc_lang.language = language + + new_doc_lang.document = document + + set_logging_information(new_doc_lang, dt.now(tz.utc)) + + return new_doc_lang + + +def upload_file_for_version(doc_vers, file, extension): + error = None + doc_vers.file_type = extension + doc_vers.file_name = doc_vers.calc_file_name() + doc_vers.file_location = doc_vers.calc_file_location() + + upload_path = os.path.join(current_app.config['UPLOAD_FOLDER'], doc_vers.file_location) + if not os.path.exists(upload_path): + os.makedirs(upload_path, exist_ok=True) + file.save(os.path.join(upload_path, doc_vers.file_name)) + try: + db.session.commit() + except Exception as e: + db.session.rollback() + error = e.args + if error is None: + flash('Document saved successfully.', 'success') + current_app.logger.info(f'Starting Doucment processing for tenant {session['tenant']['id']} for document ' + f'version {doc_vers.id}') + # TODO: processing of document to embeddings (async) + flash('Document processing started.', 'info') + else: + flash('Error saving document.', 'error') + current_app.logger.error(f'Error saving document for tenant {session["tenant"]["id"]}: {error}') + + +# Sample code for adding or updating versions and ensuring latest_version is set in DocumentLanguage +# def add_or_update_version(language_id, version_data): +# new_version = Version(language_id=language_id, **version_data) +# db.session.add(new_version) +# db.session.flush() # Ensures new_version gets an ID assigned if it's new +# +# # Assuming we always call this when we know it's the latest +# language = Language.query.get(language_id) +# language.latest_version_id = new_version.id +# db.session.commit() + +# sample code for using latest_version in the application +# @app.route('/language/') +# def show_language(language_id): +# language = Language.query.get_or_404(language_id) +# latest_version = language.latest_version # This is now a direct, efficient database access +# return render_template('language_details.html', language=language, latest_version=latest_version) + + +def prepare_document_data(docs): + rows = [] + for doc in docs: + doc_row = [{'value': doc.name, 'class': '', 'type': 'text'}, + {'value': doc.created_at.strftime("%Y-%m-%d %H:%M:%S"), 'class': '', 'type': 'text'}] + # Document basic details + if doc.valid_from: + doc_row.append({'value': doc.valid_from.strftime("%Y-%m-%d"), 'class': '', 'type': 'text'}) + else: + doc_row.append({'value': '', 'class': '', 'type': 'text'}) + + # Nested languages and versions + languages_rows = [] + for lang in doc.languages: + lang_row = [{'value': lang.language, 'class': '', 'type': 'text'}] + + # Latest version details if available (should be available ;-) ) + if lang.latest_version: + lang_row.append({'value': lang.latest_version.created_at.strftime("%Y-%m-%d %H:%M:%S"), + 'class': '', 'type': 'text'}) + if lang.latest_version.url: + lang_row.append({'value': lang.latest_version.url, + 'class': '', 'type': 'link', 'href': lang.latest_version.url}) + else: + lang_row.append({'value': '', 'class': '', 'type': 'text'}) + + if lang.latest_version.file_name: + lang_row.append({'value': lang.latest_version.file_name, 'class': '', 'type': 'text'}) + else: + lang_row.append({'value': '', 'class': '', 'type': 'text'}) + + if lang.latest_version.file_type: + lang_row.append({'value': lang.latest_version.file_type, 'class': '', 'type': 'text'}) + else: + lang_row.append({'value': '', 'class': '', 'type': 'text'}) + # Include other details as necessary + + languages_rows.append(lang_row) + + doc_row.append({'is_group': True, 'colspan': '4', + 'headers': ['Language', 'Latest Version', 'URL', 'File Name', 'Type'], + 'sub_rows': languages_rows}) + rows.append(doc_row) + return rows diff --git a/eveai_app/views/user_views.py b/eveai_app/views/user_views.py index b6c1fc7..f9b1637 100644 --- a/eveai_app/views/user_views.py +++ b/eveai_app/views/user_views.py @@ -1,7 +1,7 @@ # from . import user_bp import uuid from datetime import datetime as dt, timezone as tz -from flask import request, redirect, url_for, flash, render_template, Blueprint, session +from flask import request, redirect, url_for, flash, render_template, Blueprint, session, current_app from flask_security import hash_password, roles_required, roles_accepted from ..models.user import User, Tenant, Role @@ -83,9 +83,12 @@ def tenant(): # Create schema for new tenant if error is None: + current_app.logger.info(f"Successfully created tenant {new_tenant.id} in Database") + current_app.logger.info(f"Creating schema for tenant {new_tenant.id}") Database(new_tenant.id).create_tenant_schema() - - flash(error) if error else flash('Tenant added successfully.') + else: + current_app.logger.error(f"Error creating tenant: {error}") + flash(error) if error else flash('Tenant added successfully.') form = TenantForm() return render_template('user/tenant.html', form=form) diff --git a/eveai_app/worker/celery_utils.py b/eveai_app/worker/celery_utils.py new file mode 100644 index 0000000..8b6fe4b --- /dev/null +++ b/eveai_app/worker/celery_utils.py @@ -0,0 +1,9 @@ +def init_celery(celery, app): + celery.conf.update(app.config) # Load all configurations form Flask app including Queue settings + + class ContextTask(celery.Task): + def __call__(self, *args, **kwargs): + with app.app_context(): + return self.run(*args, **kwargs) + + celery.Task = ContextTask diff --git a/eveai_app/worker/tasks.py b/eveai_app/worker/tasks.py new file mode 100644 index 0000000..973d224 --- /dev/null +++ b/eveai_app/worker/tasks.py @@ -0,0 +1,59 @@ +from datetime import datetime as dt, timezone as tz +from flask import current_app +from langchain_mistralai import MistralAIEmbeddings +from langchain_openai import OpenAIEmbeddings +from langchain_community.document_loaders.pdf import PyPDFLoader +from langchain_community.vectorstores.chroma import Chroma +from langchain_text_splitters import CharacterTextSplitter +import os + +from eveai_app import celery +from ..utils.database import Database +from ..models.document import DocumentVersion, EmbeddingMistral, EmbeddingSmallOpenAI +from .. import db + + +@celery.task(name='create_embeddings', queue='embeddings') +def create_embeddings(tenant_id, document_version_id, embedding_model_def): + current_app.logger.info(f'Creating embeddings for tenant {tenant_id} on document version {document_version_id} ' + f'with model {embedding_model_def}') + Database(tenant_id).switch_schema() + document_version = DocumentVersion.query.get(document_version_id) + if document_version is None: + current_app.logger.error(f'Cannot create embeddings for tenant {tenant_id}. ' + f'Document version {document_version_id} not found') + return + db.session.add(document_version) + + # start processing + document_version.processing = True + document_version.processing_started_at = dt.now(tz.utc) + db.session.commit() + + embedding_provider = embedding_model_def.rsplit('.', 1)[0] + embedding_model = embedding_model_def.rsplit('.', 1)[1] + # define embedding variables + match (embedding_provider, embedding_model): + case ('openai', 'text-embedding-3-small'): + embedding_model = EmbeddingSmallOpenAI() + case ('mistral', 'text-embedding-3-small'): + embedding_model = EmbeddingMistral() + + match document_version.file_type: + case 'pdf': + pdf_file = os.path.join(current_app.config['UPLOAD_FOLDER'], + document_version.file_location, + document_version.file_path) + loader = PyPDFLoader(pdf_file) + + # We + text_splitter = CharacterTextSplitter(chunk_size=1000, chunk_overlap=0) + documents = text_splitter.split_documents(loader.load()) + + pass + + +@celery.task(name='ask_eveAI', queue='llm_interactions') +def ask_eve_ai(query): + # Interaction logic with LLMs like GPT (Langchain API calls, etc.) + pass diff --git a/requirements.txt b/requirements.txt index f76c9df..07c9bda 100644 --- a/requirements.txt +++ b/requirements.txt @@ -4,4 +4,6 @@ SQLAlchemy~=2.0.29 alembic~=1.13.1 Werkzeug~=3.0.2 pgvector~=0.2.5 -gevent~=24.2.1 \ No newline at end of file +gevent~=24.2.1 +celery~=5.4.0 +kombu~=5.3.7 \ No newline at end of file