Skip to content

Commit

Permalink
Merge pull request #305 from TogetherCrew/feat/302-telegram-raw-vecto…
Browse files Browse the repository at this point in the history
…rize

fix: telegram vectorize - runtime bugs!
  • Loading branch information
amindadgar authored Oct 21, 2024
2 parents 1540439 + 8a2a3c6 commit d43750d
Showing 1 changed file with 32 additions and 12 deletions.
44 changes: 32 additions & 12 deletions dags/hivemind_telegram_etl.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,11 @@ def fetch_chat_ids() -> list[tuple[str, str]]:
a list of Telegram chat id and name
"""
load_dotenv()
chat_infos = TelegramChats.extract_chats()
logging.info(f"Extracted chats: {chat_infos}")
chat_infos = TelegramChats().extract_chats()
return chat_infos

@task
def chat_existence(chat_info: tuple[str, str]) -> tuple[str, str]:
def chat_existence(chat_info: tuple[str, str]) -> dict[str, tuple[str, str] | str]:
"""
check if the community & platform was created for the telegram or not
if not, create a community and platform and hivemind module for it
Expand All @@ -46,7 +45,9 @@ def chat_existence(chat_info: tuple[str, str]) -> tuple[str, str]:
Returns
---------
chat_info : tuple[str, str]
details : dict[str, tuple[str, str] | str]
the chat details containing the chat_info
and a community id related to that
tuple containing telegram chat id and chat name
"""
chat_id = chat_info[0]
Expand All @@ -62,20 +63,33 @@ def chat_existence(chat_info: tuple[str, str]) -> tuple[str, str]:

community_id = utils.create_platform()

return chat_info, community_id
return {
"chat_info": chat_info,
"community_id": str(community_id),
}

@task
def processor(chat_info: tuple[str, str], community_id: str) -> None:
def processor(
details: dict[str, tuple[str, str] | str],
) -> None:
"""
extract, transform, and load telegram data
Parameters
-----------
chat_id : str
a telegram chat id
community_id : str
the community id, related the created community
details : dict[str, tuple[str, str] | str]
the chat details containing the chat_info
and a community id related to that
tuple containing telegram chat id and chat name
"""
load_dotenv()
logging.info(f"received details: {details}!")
# unwrapping data
chat_info = details["chat_info"]
community_id = details["community_id"]

logging.info(f"Started processing community: {community_id}")

chat_id = chat_info[0]
chat_name = chat_info[1]

Expand All @@ -92,12 +106,18 @@ def processor(chat_info: tuple[str, str], community_id: str) -> None:
if latest_date:
# this is to catch any edits for messages of 30 days ago
from_date = latest_date - timedelta(days=30)
logging.info(f"Started extracting from date: {from_date}!")
messages = extractor.extract(from_date=from_date)
else:
logging.info("Started extracting data from scratch!")
messages = extractor.extract()

logging.info(f"Extracted {len(messages)} messages!")
documents = transformer.transform(messages=messages)
logging.info(f"{len(messages)} Messages transformed!")
ingestion_pipeline.run_pipeline(docs=documents)
logging.info("Finished loading into database!")

chat_infos = fetch_chat_ids()
chat_info, community_id = chat_existence.expand(chat_info=chat_infos)
processor(chat_info=chat_info, community_id=community_id)
details = chat_existence.expand(chat_info=chat_infos)
processor.expand(details=details)

0 comments on commit d43750d

Please sign in to comment.