Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Data Ingestion Improvement/Cleanup/Bug Fix - Part 2 #307

Merged
merged 6 commits into from
Mar 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion airflow/Dockerfile
Original file line number Diff line number Diff line change
@@ -1 +1 @@
FROM quay.io/astronomer/astro-runtime:10.0.0
FROM quay.io/astronomer/astro-runtime:10.1.0
4 changes: 2 additions & 2 deletions airflow/dags/ingestion/ask-astro-forum-load.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,9 @@ def get_astro_forum_content():
),
)
def ask_astro_load_astro_forum():
from include.tasks import split
from include.tasks import chunking_utils

split_docs = task(split.split_html).expand(dfs=[get_astro_forum_content()])
split_docs = task(chunking_utils.split_html).expand(dfs=[get_astro_forum_content()])

_import_data = WeaviateDocumentIngestOperator.partial(
class_name=WEAVIATE_CLASS,
Expand Down
4 changes: 2 additions & 2 deletions airflow/dags/ingestion/ask-astro-load-airflow-docs.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,10 @@ def ask_astro_load_airflow_docs():
data from a point-in-time data capture. By using the upsert logic of the weaviate_import decorator
any existing documents that have been updated will be removed and re-added.
"""
from include.tasks import split
from include.tasks import chunking_utils
from include.tasks.extract import airflow_docs

extracted_airflow_docs = task(split.split_html).expand(
extracted_airflow_docs = task(chunking_utils.split_html).expand(
dfs=[airflow_docs.extract_airflow_docs(docs_base_url=airflow_docs_base_url)]
)

Expand Down
4 changes: 2 additions & 2 deletions airflow/dags/ingestion/ask-astro-load-astro-cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,11 @@ def ask_astro_load_astro_cli_docs():
data from a point-in-time data capture. By using the upsert logic of the weaviate_import decorator
any existing documents that have been updated will be removed and re-added.
"""
from include.tasks import split
from include.tasks import chunking_utils
from include.tasks.extract import astro_cli_docs

extract_astro_cli_docs = task(astro_cli_docs.extract_astro_cli_docs)()
split_md_docs = task(split.split_html).expand(dfs=[extract_astro_cli_docs])
split_md_docs = task(chunking_utils.split_html).expand(dfs=[extract_astro_cli_docs])

_import_data = WeaviateDocumentIngestOperator.partial(
class_name=WEAVIATE_CLASS,
Expand Down
6 changes: 5 additions & 1 deletion airflow/dags/ingestion/ask-astro-load-astro-sdk.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@ def get_astro_sdk_content():
),
)
def ask_astro_load_astro_sdk():
from include.tasks import chunking_utils

split_docs = task(chunking_utils.split_html).expand(dfs=[get_astro_sdk_content()])

_import_data = WeaviateDocumentIngestOperator.partial(
class_name=WEAVIATE_CLASS,
existing="replace",
Expand All @@ -45,7 +49,7 @@ def ask_astro_load_astro_sdk():
verbose=True,
conn_id=_WEAVIATE_CONN_ID,
task_id="WeaviateDocumentIngestOperator",
).expand(input_data=[get_astro_sdk_content()])
).expand(input_data=[split_docs])


ask_astro_load_astro_sdk()
4 changes: 2 additions & 2 deletions airflow/dags/ingestion/ask-astro-load-astronomer-docs.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,12 @@ def ask_astro_load_astronomer_docs():
"""
This DAG performs incremental load for any new docs in astronomer docs.
"""
from include.tasks import split
from include.tasks import chunking_utils
from include.tasks.extract.astro_docs import extract_astro_docs

astro_docs = task(extract_astro_docs)()

split_html_docs = task(split.split_html).expand(dfs=[astro_docs])
split_html_docs = task(chunking_utils.split_html).expand(dfs=[astro_docs])

_import_data = WeaviateDocumentIngestOperator.partial(
class_name=WEAVIATE_CLASS,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,10 @@ def ask_astro_load_astronomer_providers():
any existing documents that have been updated will be removed and re-added.
"""

from include.tasks import chunking_utils

split_docs = task(chunking_utils.split_html).expand(dfs=[get_provider_content()])

_import_data = WeaviateDocumentIngestOperator.partial(
class_name=WEAVIATE_CLASS,
existing="replace",
Expand All @@ -51,7 +55,7 @@ def ask_astro_load_astronomer_providers():
verbose=True,
conn_id=_WEAVIATE_CONN_ID,
task_id="WeaviateDocumentIngestOperator",
).expand(input_data=[get_provider_content()])
).expand(input_data=[split_docs])


ask_astro_load_astronomer_providers()
4 changes: 2 additions & 2 deletions airflow/dags/ingestion/ask-astro-load-blogs.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,12 @@ def ask_astro_load_blogs():
data from a point-in-time data capture. By using the upsert logic of the weaviate_import decorator
any existing documents that have been updated will be removed and re-added.
"""
from include.tasks import split
from include.tasks import chunking_utils
from include.tasks.extract import blogs

blogs_docs = task(blogs.extract_astro_blogs)(blog_cutoff_date=blog_cutoff_date)

split_md_docs = task(split.split_markdown).expand(dfs=[blogs_docs])
split_md_docs = task(chunking_utils.split_markdown).expand(dfs=[blogs_docs])

_import_data = WeaviateDocumentIngestOperator.partial(
class_name=WEAVIATE_CLASS,
Expand Down
4 changes: 2 additions & 2 deletions airflow/dags/ingestion/ask-astro-load-cosmos-docs.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,9 @@ def ask_astro_load_cosmos_docs():
any existing documents that have been updated will be removed and re-added.
"""

from include.tasks import split
from include.tasks import chunking_utils

split_docs = task(split.split_html).expand(dfs=[extract_cosmos_docs()])
split_docs = task(chunking_utils.split_html).expand(dfs=[extract_cosmos_docs()])

_import_data = WeaviateDocumentIngestOperator.partial(
class_name=WEAVIATE_CLASS,
Expand Down
4 changes: 2 additions & 2 deletions airflow/dags/ingestion/ask-astro-load-github.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ def ask_astro_load_github():
data from a point-in-time data capture. By using the upsert logic of the weaviate_import decorator
any existing documents that have been updated will be removed and re-added.
"""
from include.tasks import split
from include.tasks import chunking_utils
from include.tasks.extract import github

md_docs = (
Expand All @@ -58,7 +58,7 @@ def ask_astro_load_github():
.expand(repo_base=issues_docs_sources)
)

split_md_docs = task(split.split_markdown).expand(dfs=[md_docs, issues_docs])
split_md_docs = task(chunking_utils.split_markdown).expand(dfs=[md_docs, issues_docs])

_import_data = WeaviateDocumentIngestOperator.partial(
class_name=WEAVIATE_CLASS,
Expand Down
6 changes: 3 additions & 3 deletions airflow/dags/ingestion/ask-astro-load-registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,16 +32,16 @@ def ask_astro_load_registry():
data from a point-in-time data capture. By using the upsert logic of the weaviate_import decorator
any existing documents that have been updated will be removed and re-added.
"""
from include.tasks import split
from include.tasks import chunking_utils
from include.tasks.extract import registry

registry_cells_docs = task(registry.extract_astro_registry_cell_types)()

registry_dags_docs = task(registry.extract_astro_registry_dags)()

split_md_docs = task(split.split_markdown).expand(dfs=[registry_cells_docs])
split_md_docs = task(chunking_utils.split_markdown).expand(dfs=[registry_cells_docs])

split_code_docs = task(split.split_python).expand(dfs=[registry_dags_docs])
split_code_docs = task(chunking_utils.split_python).expand(dfs=[registry_dags_docs])

_import_data = WeaviateDocumentIngestOperator.partial(
class_name=WEAVIATE_CLASS,
Expand Down
4 changes: 2 additions & 2 deletions airflow/dags/ingestion/ask-astro-load-slack.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,12 @@ def ask_astro_load_slack():
DAG should run nightly to capture threads between archive periods. By using the upsert logic of the
weaviate_import decorator any existing documents that have been updated will be removed and re-added.
"""
from include.tasks import split
from include.tasks import chunking_utils
from include.tasks.extract import slack

slack_docs = task(slack.extract_slack).expand(source=slack_channel_sources)

split_md_docs = task(split.split_markdown).expand(dfs=[slack_docs])
split_md_docs = task(chunking_utils.split_markdown).expand(dfs=[slack_docs])

_import_data = WeaviateDocumentIngestOperator.partial(
class_name=WEAVIATE_CLASS,
Expand Down
4 changes: 2 additions & 2 deletions airflow/dags/ingestion/ask-astro-load-stackoverflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ def ask_astro_load_stackoverflow():
data from a point-in-time data capture. By using the upsert logic of the weaviate_import decorator
any existing documents that have been updated will be removed and re-added.
"""
from include.tasks import split
from include.tasks import chunking_utils
from include.tasks.extract import stack_overflow

stack_overflow_docs = (
Expand All @@ -47,7 +47,7 @@ def ask_astro_load_stackoverflow():
.expand(tag=stackoverflow_tags)
)

split_md_docs = task(split.split_markdown).expand(dfs=[stack_overflow_docs])
split_md_docs = task(chunking_utils.split_markdown).expand(dfs=[stack_overflow_docs])

_import_data = WeaviateDocumentIngestOperator.partial(
class_name=WEAVIATE_CLASS,
Expand Down
8 changes: 4 additions & 4 deletions airflow/dags/ingestion/ask-astro-load.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ def ask_astro_load_bulk():

"""

from include.tasks import split
from include.tasks import chunking_utils

@task
def get_schema_and_process(schema_file: str) -> list:
Expand Down Expand Up @@ -432,11 +432,11 @@ def import_baseline(

python_code_tasks = [registry_dags_docs]

split_md_docs = task(split.split_markdown).expand(dfs=markdown_tasks)
split_md_docs = task(chunking_utils.split_markdown).expand(dfs=markdown_tasks)

split_code_docs = task(split.split_python).expand(dfs=python_code_tasks)
split_code_docs = task(chunking_utils.split_python).expand(dfs=python_code_tasks)

split_html_docs = task(split.split_html).expand(dfs=html_tasks)
split_html_docs = task(chunking_utils.split_html).expand(dfs=html_tasks)

_import_data = WeaviateDocumentIngestOperator.partial(
class_name=WEAVIATE_CLASS,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,28 @@
from __future__ import annotations

import logging

import pandas as pd
import tiktoken
from langchain.schema import Document
from langchain.text_splitter import (
Language,
RecursiveCharacterTextSplitter,
)
from langchain_community.document_transformers import Html2TextTransformer

logger = logging.getLogger("airflow.task")

TARGET_CHUNK_SIZE = 2500


def enforce_max_token_len(text: str) -> str:
encoding = tiktoken.get_encoding("cl100k_base")
encoded_text = encoding.encode(text)
if len(encoded_text) > 8191:
logger.info("Token length of string exceeds the max content length of the tokenizer. Truncating...")
return encoding.decode(encoded_text[:8191])
return text


def split_markdown(dfs: list[pd.DataFrame]) -> pd.DataFrame:
"""
Expand All @@ -27,11 +42,25 @@ def split_markdown(dfs: list[pd.DataFrame]) -> pd.DataFrame:

df = pd.concat(dfs, axis=0, ignore_index=True)

splitter = RecursiveCharacterTextSplitter(chunk_size=4000, chunk_overlap=200, separators=["\n\n", "\n", " ", ""])
# directly from the langchain splitter library
separators = ["\n\n", "\n", " ", ""]
splitter = RecursiveCharacterTextSplitter.from_tiktoken_encoder(
# cl100k_base is used for text ada 002 and later embedding models
encoding_name="cl100k_base",
chunk_size=TARGET_CHUNK_SIZE,
chunk_overlap=200,
separators=separators,
is_separator_regex=True,
)

df["doc_chunks"] = df["content"].apply(lambda x: splitter.split_documents([Document(page_content=x)]))
df = df.explode("doc_chunks", ignore_index=True)
df["content"] = df["doc_chunks"].apply(lambda x: x.page_content)

# Remove blank doc chunks
df = df[~df["content"].apply(lambda x: x.isspace() or x == "")]

df["content"] = df["content"].apply(enforce_max_token_len)
df.drop(["doc_chunks"], inplace=True, axis=1)
df.reset_index(inplace=True, drop=True)

Expand All @@ -56,15 +85,36 @@ def split_python(dfs: list[pd.DataFrame]) -> pd.DataFrame:

df = pd.concat(dfs, axis=0, ignore_index=True)

splitter = RecursiveCharacterTextSplitter.from_language(
language=Language.PYTHON,
# chunk_size=50,
chunk_overlap=0,
# directly from the langchain splitter library
python_separators = [
# First, try to split along class definitions
"\nclass ",
"\ndef ",
"\n\tdef ",
# Now split by the normal type of lines
"\n\n",
"\n",
" ",
"",
]

splitter = RecursiveCharacterTextSplitter.from_tiktoken_encoder(
# cl100k_base is used for text ada 002 and later embedding models
encoding_name="cl100k_base",
chunk_size=TARGET_CHUNK_SIZE,
chunk_overlap=200,
separators=python_separators,
is_separator_regex=True,
)

df["doc_chunks"] = df["content"].apply(lambda x: splitter.split_documents([Document(page_content=x)]))
df = df.explode("doc_chunks", ignore_index=True)
df["content"] = df["doc_chunks"].apply(lambda x: x.page_content)

# Remove blank doc chunks
df = df[~df["content"].apply(lambda x: x.isspace() or x == "")]

df["content"] = df["content"].apply(enforce_max_token_len)
df.drop(["doc_chunks"], inplace=True, axis=1)
df.reset_index(inplace=True, drop=True)

Expand Down Expand Up @@ -93,9 +143,10 @@ def split_html(dfs: list[pd.DataFrame]) -> pd.DataFrame:
splitter = RecursiveCharacterTextSplitter.from_tiktoken_encoder(
# cl100k_base is used for text ada 002 and later embedding models
encoding_name="cl100k_base",
chunk_size=4000,
chunk_size=TARGET_CHUNK_SIZE,
chunk_overlap=200,
separators=separators,
is_separator_regex=True,
)

# Split by chunking first
Expand All @@ -110,17 +161,8 @@ def split_html(dfs: list[pd.DataFrame]) -> pd.DataFrame:
# Remove blank doc chunks
df = df[~df["content"].apply(lambda x: x.isspace() or x == "")]

df["content"] = df["content"].apply(enforce_max_token_len)
df.drop(["doc_chunks"], inplace=True, axis=1)
df.reset_index(inplace=True, drop=True)

return df


def split_list(urls: list[str], chunk_size: int = 0) -> list[list[str]]:
"""
split the list of string into chunk of list of string

param urls: URL list we want to chunk
param chunk_size: Max size of chunked list
"""
return [urls[i : i + chunk_size] for i in range(0, len(urls), chunk_size)]
5 changes: 2 additions & 3 deletions airflow/include/tasks/extract/airflow_docs.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,11 @@
import urllib.parse

import pandas as pd
import requests
from bs4 import BeautifulSoup
from weaviate.util import generate_uuid5

from airflow.decorators import task
from include.tasks.extract.utils.html_utils import get_internal_links
from include.tasks.extract.utils.html_utils import fetch_page_content, get_internal_links


@task
Expand Down Expand Up @@ -46,7 +45,7 @@ def extract_airflow_docs(docs_base_url: str) -> list[pd.DataFrame]:

df = pd.DataFrame(docs_links, columns=["docLink"])

df["html_content"] = df["docLink"].apply(lambda x: requests.get(x).content)
df["html_content"] = df["docLink"].apply(fetch_page_content)

df["content"] = df["html_content"].apply(
lambda x: str(BeautifulSoup(x, "html.parser").find(class_="body", role="main"))
Expand Down
Loading
Loading