Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Creating notebook to ingest CloudSQL database using kubernetes docs #751

Open
wants to merge 36 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
9b05372
Creating notebook to ingest CloudSQL database using kubernetes
german-grandas Jul 26, 2024
015d3ff
Running rag e2e test with kubernetes docs.
german-grandas Sep 18, 2024
8f48578
Updating branch with main
german-grandas Sep 18, 2024
d05e198
Reverting change
german-grandas Sep 18, 2024
67fa74b
Fixing issue converting notebook to script.
german-grandas Sep 18, 2024
8197739
Update cloudbuild.yaml to fix generation of script.
german-grandas Sep 18, 2024
f54e7ed
updating notebook variables
german-grandas Sep 18, 2024
34e1d5d
Merge branch 'add/example_notebooks/kubernetes_docs' of https://githu…
german-grandas Sep 18, 2024
37b5024
Adding iptype configuration to db engine
german-grandas Sep 19, 2024
b046e93
Updating Rag application README
german-grandas Sep 19, 2024
9852c43
Updating branch with main
german-grandas Sep 23, 2024
6575552
updating cloudbuild to run tests on kubernetes docs instead of the ne…
german-grandas Sep 23, 2024
f0482f6
Fixing issue with cloudbuild file
german-grandas Sep 23, 2024
4da233c
adding missing import
german-grandas Sep 23, 2024
c44b28c
adding missing os package
german-grandas Sep 23, 2024
8063d7c
Updating notebook to ingest database using kubernetes
german-grandas Sep 24, 2024
a109c8c
Fixing issue on notebook with document processing
german-grandas Sep 24, 2024
d52a850
Updating dlp templates on test_rag.py
german-grandas Sep 24, 2024
e15a388
Reviewing suggested comments
german-grandas Oct 4, 2024
2a21fb5
Merge branch 'main' of https://github.com/GoogleCloudPlatform/ai-on-g…
german-grandas Oct 4, 2024
bf13358
updating dataset_embeddings_table_name
german-grandas Oct 7, 2024
15c5dc8
Merge branch 'main' of https://github.com/GoogleCloudPlatform/ai-on-g…
german-grandas Oct 7, 2024
39837b5
updating notebook, deleting metadata cache
german-grandas Oct 7, 2024
41f4a75
updating notebook formatting
german-grandas Oct 8, 2024
d7f9165
removing conflicting package, updating embeddings importing
german-grandas Oct 8, 2024
41b7bea
fixing import on notebook
german-grandas Oct 8, 2024
edd5f8f
Reverting change on notebook
german-grandas Oct 8, 2024
df11fa3
Refactoring notebook so ray can be used, updating cloudbuild.yml file.
german-grandas Oct 15, 2024
d92a9d9
Merge branch 'main' of https://github.com/GoogleCloudPlatform/ai-on-g…
german-grandas Oct 15, 2024
c7bfb03
Updating based on comments
german-grandas Oct 22, 2024
47d9c34
Merge branch 'main' of https://github.com/GoogleCloudPlatform/ai-on-g…
german-grandas Oct 22, 2024
bba5207
Working on comments
german-grandas Oct 25, 2024
aea0f7d
Merge branch 'main' of https://github.com/GoogleCloudPlatform/ai-on-g…
german-grandas Oct 25, 2024
610658e
Reverting change adding new variable to kuberay
german-grandas Oct 25, 2024
a91fe1d
Fixing issue with ray working_dir
german-grandas Oct 28, 2024
57db05c
Merge branch 'main' of https://github.com/GoogleCloudPlatform/ai-on-g…
german-grandas Oct 28, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 6 additions & 26 deletions applications/rag/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ RAG uses a semantically searchable knowledge base (like vector search) to retrie
5. A [Jupyter](https://docs.jupyter.org/en/latest/) notebook running on GKE that reads the dataset using GCS fuse driver integrations and runs a Ray job to populate the vector DB.
3. A front end chat interface running on GKE that prompts the inference server with context from the vector DB.

This tutorial walks you through installing the RAG infrastructure in a GCP project, generating vector embeddings for a sample [Kaggle Netflix shows](https://www.kaggle.com/datasets/shivamb/netflix-shows) dataset and prompting the LLM with context.
german-grandas marked this conversation as resolved.
Show resolved Hide resolved
This tutorial walks you through installing the RAG infrastructure in a GCP project, generating vector embeddings for a sample [Kubernetes Docs](https://github.com/dohsimpson/kubernetes-doc-pdf) dataset and prompting the LLM with context.

# Prerequisites

Expand Down Expand Up @@ -74,7 +74,7 @@ This section sets up the RAG infrastructure in your GCP project using Terraform.

# Generate vector embeddings for the dataset

This section generates the vector embeddings for your input dataset. Currently, the default dataset is [Netflix shows](https://www.kaggle.com/datasets/shivamb/netflix-shows). We will use a Jupyter notebook to run a Ray job that generates the embeddings & populates them into the `pgvector` instance created above.
This section generates the vector embeddings for your input dataset. Currently, the default dataset is [Kubernetes docs](https://github.com/dohsimpson/kubernetes-doc-pdf). We will use a Jupyter notebook to generate the embeddings & populates them into the `pgvector` instance created above.

Set your the namespace, cluster name and location from `workloads.tfvars`):

Expand Down Expand Up @@ -108,30 +108,10 @@ gcloud container clusters get-credentials ${CLUSTER_NAME} --location=${CLUSTER_L

2. Load the notebook:
- Once logged in to JupyterHub, choose the `CPU` preset with `Default` storage.
- Click [File] -> [Open From URL] and paste: `https://raw.githubusercontent.com/GoogleCloudPlatform/ai-on-gke/main/applications/rag/example_notebooks/rag-kaggle-ray-sql-interactive.ipynb`

3. Configure Kaggle:
- Create a [Kaggle account](https://www.kaggle.com/account/login?phase=startRegisterTab&returnUrl=%2F).
- [Generate an API token](https://www.kaggle.com/settings/account). See [further instructions](https://www.kaggle.com/docs/api#authentication). This token is used in the notebook to access the [Kaggle Netflix shows](https://www.kaggle.com/datasets/shivamb/netflix-shows) dataset.
- Replace the variables in the 1st cell of the notebook with your Kaggle credentials (can be found in the `kaggle.json` file created while generating the API token):
* `KAGGLE_USERNAME`
* `KAGGLE_KEY`

4. Generate vector embeddings: Run all the cells in the notebook to generate vector embeddings for the Netflix shows dataset (https://www.kaggle.com/datasets/shivamb/netflix-shows) and store them in the `pgvector` CloudSQL instance via a Ray job.
german-grandas marked this conversation as resolved.
Show resolved Hide resolved
* When the last cell says the job has succeeded (eg: `Job 'raysubmit_APungAw6TyB55qxk' succeeded`), the vector embeddings have been generated and we can launch the frontend chat interface. Note that running the job can take up to 10 minutes.
* Ray may take several minutes to create the runtime environment. During this time, the job will appear to be missing (e.g. `Status message: PENDING`).
* Connect to the Ray dashboard to check the job status or logs:
- If IAP is disabled (`ray_dashboard_add_auth = false`):
- `kubectl port-forward -n ${NAMESPACE} service/ray-cluster-kuberay-head-svc 8265:8265`
- Go to `localhost:8265` in a browser
- If IAP is enabled (`ray_dashboard_add_auth = true`):
- Fetch the domain: `terraform output ray-dashboard-managed-cert`
- If you used a custom domain, ensure you configured your DNS as described above.
- Verify the domain status is `Active`:
- `kubectl get managedcertificates ray-dashboard-managed-cert -n ${NAMESPACE} --output jsonpath='{.status.domainStatus[0].status}'`
- Note: This can take up to 20 minutes to propagate.
- Once the domain status is Active, go to the domain in a browser and login with your Google credentials.
- To add additional users to your frontend application, go to [Google Cloud Platform IAP](https://console.cloud.google.com/security/iap), select the `rag/ray-cluster-kuberay-head-svc` service and add principals with the role `IAP-secured Web App User`.
- Click [File] -> [Open From URL] and paste: `https://raw.githubusercontent.com/GoogleCloudPlatform/ai-on-gke/main/applications/rag/example_notebooks/rag-data-ingest-with-kubernetes-docs.ipynb`


3. Generate vector embeddings: Run all the cells in the notebook to generate vector embeddings for the [Kubernetes documentation](https://github.com/dohsimpson/kubernetes-doc-pdf) and store them in the `pgvector` CloudSQL instance using a Ray Job.

# Launch the frontend chat interface

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,291 @@
{
"cells": [
{
"cell_type": "markdown",
"id": "7e14d0f0-2573-4fe4-ba87-7a447f2f511c",
"metadata": {},
"source": [
"# RAG-on-GKE Application\n",
"\n",
"This is a Python notebook for generating the vector embeddings based on [Kubernetes docs](https://github.com/dohsimpson/kubernetes-doc-pdf/) used by the RAG on GKE application. \n",
"For full information, please checkout the GitHub documentation [here](https://github.com/GoogleCloudPlatform/ai-on-gke/blob/main/applications/rag/README.md).\n"
]
},
{
"cell_type": "markdown",
"id": "2cba26cf",
"metadata": {},
"source": [
"## Clone the kubernetes docs repo"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "5f9b1fad-537e-425f-a5fc-587a408b1fab",
"metadata": {},
"outputs": [],
"source": [
"!mkdir /data/kubernetes-docs -p\n",
"!git clone https://github.com/dohsimpson/kubernetes-doc-pdf /data/kubernetes-docs\n"
]
},
{
"cell_type": "markdown",
"id": "b984429c-b65a-47b7-9723-ee3ad81d61db",
"metadata": {},
"source": [
"## Install the required packages"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "40e4d29d-79c6-4233-a8ed-0f8a42576656",
"metadata": {
"scrolled": true
},
"outputs": [],
"source": [
"!pip install langchain langchain-community sentence_transformers pypdf"
]
},
{
"cell_type": "markdown",
"id": "f80cc5af-a1fa-456d-a4ed-fa2ffa3b87a0",
"metadata": {},
"source": [
"## Writting job to be used on the Ray Cluster"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "36523f3f-0c93-41da-abb9-c113bb456bc1",
"metadata": {},
"outputs": [],
"source": [
"# Create a directory to package the contents that need to be downloaded in ray worker\n",
"! mkdir -p rag-app"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "69d912e5-2225-4b44-80cd-651f7cc71a40",
"metadata": {},
"outputs": [],
"source": [
"%%writefile rag-app/job.py\n",
"\n",
"import os\n",
"import uuid\n",
"import glob\n",
"\n",
"from langchain.text_splitter import RecursiveCharacterTextSplitter\n",
"from langchain.embeddings import HuggingFaceEmbeddings\n",
"from langchain_community.document_loaders import PyPDFLoader\n",
"\n",
"from google.cloud.sql.connector import Connector, IPTypes\n",
"import sqlalchemy\n",
"\n",
"from sqlalchemy.ext.declarative import declarative_base\n",
"from sqlalchemy import Column, String, Text, text, JSON\n",
"from sqlalchemy.orm import scoped_session, sessionmaker, mapped_column\n",
"from pgvector.sqlalchemy import Vector\n",
"\n",
"# initialize parameters\n",
"\n",
"INSTANCE_CONNECTION_NAME = os.environ[\"CLOUDSQL_INSTANCE_CONNECTION_NAME\"]\n",
"print(f\"Your instance connection name is: {INSTANCE_CONNECTION_NAME}\")\n",
"VECTOR_EMBEDDINGS_TABLE_NAME = \"rag_embeddings_db\"\n",
"DB_NAME = \"pgvector-database\"\n",
"\n",
"db_username_file = open(\"/etc/secret-volume/username\", \"r\")\n",
"DB_USER = db_username_file.read()\n",
"db_username_file.close()\n",
"\n",
"db_password_file = open(\"/etc/secret-volume/password\", \"r\")\n",
"DB_PASS = db_password_file.read()\n",
"db_password_file.close()\n",
"\n",
"# initialize Connector object\n",
"connector = Connector()\n",
"\n",
"# function to return the database connection object\n",
"def getconn():\n",
" conn = connector.connect(\n",
" INSTANCE_CONNECTION_NAME,\n",
" \"pg8000\",\n",
" user=DB_USER,\n",
" password=DB_PASS,\n",
" db=DB_NAME,\n",
" ip_type=IPTypes.PRIVATE\n",
" )\n",
" return conn\n",
"\n",
"# create connection pool with 'creator' argument to our connection object function\n",
"pool = sqlalchemy.create_engine(\n",
" \"postgresql+pg8000://\",\n",
" creator=getconn,\n",
")\n",
"\n",
"Base = declarative_base()\n",
"DBSession = scoped_session(sessionmaker())\n",
"\n",
"class TextEmbedding(Base):\n",
" __tablename__ = VECTOR_EMBEDDINGS_TABLE_NAME\n",
" id = Column(String(255), primary_key=True)\n",
" text = Column(Text)\n",
" text_embedding = mapped_column(Vector(384))\n",
"\n",
"with pool.connect() as conn:\n",
" conn.execute(text(\"CREATE EXTENSION IF NOT EXISTS vector\"))\n",
" conn.commit() \n",
" \n",
"DBSession.configure(bind=pool, autoflush=False, expire_on_commit=False)\n",
"Base.metadata.drop_all(pool)\n",
"Base.metadata.create_all(pool)\n",
"\n",
"SENTENCE_TRANSFORMER_MODEL = \"intfloat/multilingual-e5-small\" # Transformer to use for converting text chunks to vector embeddings\n",
"\n",
"# the dataset has been pre-dowloaded to the GCS bucket as part of the notebook in the cell above. Ray workers will find the dataset readily mounted.\n",
"SHARED_DATASET_BASE_PATH = \"/data/kubernetes-docs/\"\n",
"\n",
"CHUNK_SIZE = 1000 # text chunk sizes which will be converted to vector embeddings\n",
"CHUNK_OVERLAP = 10\n",
"VECTOR_DIMENSION = 384 # Embeddings size\n",
"\n",
"splitter = RecursiveCharacterTextSplitter(chunk_size=CHUNK_SIZE, chunk_overlap=CHUNK_OVERLAP, length_function=len)\n",
"embeddings_service = HuggingFaceEmbeddings(model_name=SENTENCE_TRANSFORMER_MODEL)\n",
"\n",
"def process_pdf(file_path):\n",
" \"\"\"Loads, splits and embed a single PDF file.\"\"\"\n",
" loader = PyPDFLoader(file_path)\n",
" print(f\"Loading {file_path}\")\n",
" pages = loader.load_and_split()\n",
" \n",
" splits = splitter.split_documents(pages)\n",
"\n",
" chunks = []\n",
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The variable name chunks is confusing here, it sounds more like the raw data chunks before embedding

" for split in splits:\n",
" id = uuid.uuid4()\n",
" page_content = split.page_content\n",
" file_metadata = split.metadata\n",
" embedded_document = embeddings_service.embed_query(page_content)\n",
" split_data = {\n",
" \"langchain_id\" : id,\n",
" \"content\" : page_content,\n",
" \"embedding\" : embedded_document,\n",
" \"langchain_metadata\" : file_metadata\n",
Comment on lines +177 to +180
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this even work? The keys of the split_data do not match the schema of TextEmbedding.

" }\n",
" chunks.append(split_data)\n",
" return chunks\n",
"\n",
"documents_file_paths = glob.glob(f\"{SHARED_DATASET_BASE_PATH}/PDFs/*.pdf\")\n",
"for file_path in documents_file_paths:\n",
" processed_result = process_pdf(file_path)\n",
" DBSession.bulk_insert_mappings(TextEmbedding, processed_result)\n",
" \n",
"DBSession.commit()\n",
"\n",
"#Verifying the results.\n",
"\n",
"query_text = \"What's kubernetes?\" \n",
"query_emb = embeddings_service.embed_query(query_text).tolist()\n",
"query_request = \"SELECT id, text, text_embedding, 1 - ('[\" + \",\".join(map(str, query_emb)) + \"]' <=> text_embedding) AS cosine_similarity FROM \" + TABLE_NAME + \" ORDER BY cosine_similarity DESC LIMIT 5;\" \n",
"query_results = DBSession.execute(sqlalchemy.text(query_request)).fetchall()\n",
"DBSession.commit()\n",
"print(\"print query_results, the 1st one is the hit\")\n",
"for row in query_results:\n",
" print(row)\n",
"\n",
"print (\"end job\")"
]
},
{
"cell_type": "markdown",
"id": "6b9bc582-50cd-4d7c-b5c4-549626fd2349",
"metadata": {},
"source": [
"## Summiting the job into Ray Cluster:"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "d5b6acbe-5a14-4bc8-a4ca-58a6b3dd5391",
"metadata": {},
"outputs": [],
"source": [
"import ray, time\n",
"from ray.job_submission import JobSubmissionClient\n",
"client = JobSubmissionClient(\"ray://ray-cluster-kuberay-head-svc:10001\")"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "4eb8eae9-2a20-4c02-ac79-196942ae2783",
"metadata": {},
"outputs": [],
"source": [
"# Port forward to the Ray dashboard and go to `localhost:8265` in a browser to see job status: kubectl port-forward -n <namespace> service/ray-cluster-kuberay-head-svc 8265:8265\n",
"import time\n",
"\n",
"start_time = time.time()\n",
"job_id = client.submit_job(\n",
" entrypoint=\"python job.py\",\n",
" # Path to the local directory that contains the entrypoint file.\n",
" runtime_env={\n",
" \"working_dir\": \"/home/jovyan/rag-app\", # upload the local working directory to ray workers\n",
Copy link
Collaborator

@gongmax gongmax Oct 24, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems work but can you help me to understand why the working_dir looks like this, specifically the jovyan part?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wasn't sure so I took the same approach from

" \"working_dir\": \"/home/jovyan/rag-app\", # upload the local working directory to ray workers\n",
, there the working dir is setup as /home/jovyan/rag-app\ is updated so just can be /home/rag-app\

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Regarding to this, I think we prefer the interactive pattern which rag-kaggle-ray-sql-interactive.ipynb followes. It would be great if we can follow the pattern in this notebook.

" \"pip\": [ \n",
" \"langchain\",\n",
" \"langchain-community\",\n",
" \"sentence-transformers\",\n",
" \"pypdf\",\n",
" \"pgvector\"\n",
" ]\n",
" }\n",
")\n",
"\n",
"# The Ray job typically takes 5m-10m to complete.\n",
"print(\"Job submitted with ID:\", job_id)\n",
"while True:\n",
" status = client.get_job_status(job_id)\n",
" print(\"Job status:\", status)\n",
" print(\"Job info:\", client.get_job_info(job_id).message)\n",
" if status.is_terminal():\n",
" break\n",
" time.sleep(30)\n",
"\n",
"end_time = time.time()\n",
"job_duration = end_time - start_time\n",
"print(f\"Job completed in {job_duration} seconds.\")\n",
"\n",
"ray.shutdown()"
]
}
],
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar as what we did before, can we add a cell to verify the embeddings got created and stored in the database correctly?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added on line 194.

"metadata": {
"kernelspec": {
"display_name": "Python 3 (ipykernel)",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.10.11"
}
},
"nbformat": 4,
"nbformat_minor": 5
}
2 changes: 1 addition & 1 deletion applications/rag/metadata.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ spec:
- name: dataset_embeddings_table_name
description: Name of the table that stores vector embeddings for input dataset
varType: string
defaultValue: netflix_reviews_db
defaultValue: rag_embeddings_db
- name: disable_ray_cluster_network_policy
description: Disables Kubernetes Network Policy for Ray Clusters for this demo. Defaulting to 'true' aka disabled pending fixes to the kuberay-monitoring module. This should be defaulted to false.
varType: bool
Expand Down
Loading