Skip to content

Commit

Permalink
Merge pull request #306 from TogetherCrew/feat/302-telegram-raw-vecto…
Browse files Browse the repository at this point in the history
…rize

feat: Telegram vectorize handling the banned users!
  • Loading branch information
amindadgar authored Oct 21, 2024
2 parents d43750d + e72bff7 commit 5eeb2c4
Show file tree
Hide file tree
Showing 7 changed files with 103 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,13 @@ def extract(self, from_date: datetime | None = None) -> list[TelegramMessagesMod
first_message AS message,
last_edit.updated_at AS edited_at,
last_edit.text AS message_text
OPTIONAL MATCH (author:TGUser)-[created_rel:CREATED_MESSAGE]->(message)
MATCH (author:TGUser)-[created_rel:CREATED_MESSAGE]->(message)
WHERE NOT EXISTS {{
MATCH (author)-[banned_rel:BANNED]->(c:TGChat {{id: $chat_id}})
MATCH (author)-[joined_rel:JOINED|UNBANNED]->(c)
WITH author, MAX(banned_rel.date) AS banned_time, MAX(joined_rel.date) AS joined_time
WHERE banned_time > joined_time
}}
OPTIONAL MATCH (reacted_user:TGUser)-[react_rel:REACTED_TO]->(message)
OPTIONAL MATCH (reply_msg:TGMessage)-[:REPLIED]->(message)
OPTIONAL MATCH (replied_user:TGUser)-[:CREATED_MESSAGE]->(reply_msg)
Expand Down
8 changes: 3 additions & 5 deletions dags/hivemind_etl_helpers/src/db/telegram/extract/tc_chats.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,13 @@ class TelegramChats:
def __init__(self) -> None:
self._connection = Neo4jOps.get_instance()

def extract_chats(self) -> list[tuple[str, str]]:
def extract_chats(self) -> list[tuple[int, str]]:
"""
extract the chat id and chat names
Returns
---------
chat_info : list[tuple[str, str]]
chat_info : list[tuple[int, str]]
a list of Telegram chat id and chat name
"""
driver = self._connection.neo4j_driver
Expand All @@ -24,9 +24,7 @@ def extract_chats(self) -> list[tuple[str, str]]:
records = session.run(
"MATCH (c:TGChat) RETURN c.id as chat_id, c.title as name"
)
chat_info = [
(str(record["chat_id"]), str(record["name"])) for record in records
]
chat_info = [(record["chat_id"], record["name"]) for record in records]
except Exception as exp:
logging.error(f"Exception during extracting chat ids. exp: {exp}")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ def create_platform(self) -> ObjectId:
community_id = ObjectId()
self._client[self.database][self.collection].insert_one(
{
"name": "telegram",
"metadata": {
"id": self.chat_id,
"name": self.chat_name,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -254,3 +254,82 @@ def test_extract_multiple_data_with_from_date(self):
data = self.extractor.extract(from_date=datetime(2024, 1, 1))

self.assertEqual(data, [])

def test_extract_single_banned_user(self):
with self.extractor._connection.neo4j_driver.session() as session:
session.run(
"""
CREATE (c:TGChat {id: $chat_id}),
(u1:TGUser {id: '927814807.0', username: 'User One'}),
(u2:TGUser {id: '203678862.0', username: 'User Two'}),
(u1)-[:JOINED {date: $joined_date1}]->(c),
(u2)-[:JOINED {date: $joined_date2}]->(c),
(m1:TGMessage {
id: '3.0',
text: '🎉️️️️️️ Welcome to the TC Ingestion Pipeline',
date: $created_at1,
updated_at: $created_at1
}
),
(m4:TGMessage {
id: '3.0',
text: '🎉️️️️️️ Welcome to the TC Ingestion Pipeline. EDITED MSG',
date: $created_at4,
updated_at: $created_at4
}
),
(m2:TGMessage {
id: '4.0',
text: 'Hi',
date: $created_at2,
updated_at: $created_at2
}
),
(m3:TGMessage {
id: '5.0',
text: 'Reply🫡',
date: $created_at3,
updated_at: $created_at3
}
),
(m1)-[:SENT_IN]->(c),
(m2)-[:SENT_IN]->(c),
(m3)-[:SENT_IN]->(c),
(m4)-[:SENT_IN]->(c),
(u1)-[:CREATED_MESSAGE]->(m1),
(u2)-[:CREATED_MESSAGE]->(m2),
(u2)-[:CREATED_MESSAGE]->(m3),
(m1)-[:EDITED]->(m4),
(m3)-[:REPLIED]->(m1),
(u2)-[:REACTED_TO {new_reaction: '[{"type":"emoji","emoji":"🍓"}]', date: $reaction_date}]->(m1),
(u2)-[:BANNED {date: $banned_date}]->(c)
""",
{
"chat_id": self.chat_id,
"created_at1": 1672531200.0, # Sunday, January 1, 2023 12:00:00 AM
"joined_date1": 1672531100.0, # Saturday, December 31, 2022 11:58:20 PM
"joined_date2": 1672531105.0, # Saturday, December 31, 2022 11:58:25 PM
"created_at4": 1672531205.0, # Sunday, January 1, 2023 12:00:05 AM
"created_at2": 1672617600.0, # Monday, January 2, 2023 12:00:00 AM
"created_at3": 1672704000.0, # Tuesday, January 3, 2023 12:00:00 AM
"reaction_date": 1672790400.0, # Wednesday, January 4, 2023 12:00:00 AM
"banned_date": 1673633100.0, # Friday, January 13, 2023 6:05:00 PM
},
)
data = self.extractor.extract()

self.assertEqual(
data,
[
TelegramMessagesModel(
message_id=3,
message_text="🎉️️️️️️ Welcome to the TC Ingestion Pipeline. EDITED MSG",
author_username="User One",
message_created_at=1672531200.0,
message_edited_at=1672531205.0,
mentions=[],
repliers=["User Two"],
reactors=["User Two"],
),
],
)
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ def test_extract_chats_single_chat(self):
)

chat_infos = self.tc_chats.extract_chats()
self.assertEqual(chat_infos, [("100000", "test chat")])
self.assertEqual(chat_infos, [(100000, "test chat")])

def test_extract_chats_multiple_chats(self):
neo4j_driver = self.tc_chats._connection.neo4j_driver
Expand Down Expand Up @@ -109,8 +109,8 @@ def test_extract_chats_multiple_chats(self):
self.assertEqual(
chat_ids,
[
("100001", "test chat"),
("100002", "test chat 2"),
("100003", "test chat 3"),
(100001, "test chat"),
(100002, "test chat 2"),
(100003, "test chat 3"),
],
)
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ def setUp(self) -> None:
self.telegram_platform.database = "TempPlatforms"
self.client.drop_database(self.telegram_platform.database)

def tearDown(self) -> None:
self.client.drop_database(self.telegram_platform.database)

def test_check_no_platform_available(self):
result = self.telegram_platform.check_platform_existence()
self.assertFalse(result)
Expand All @@ -29,6 +32,7 @@ def test_single_platform_available(self):
self.telegram_platform.collection
].insert_one(
{
"name": "telegram",
"metadata": {
"id": self.chat_id,
"name": self.chat_name,
Expand All @@ -55,6 +59,7 @@ def test_telegram_multiple_platform_not_available(self):
].insert_many(
[
{
"name": "telegram",
"metadata": {
"id": chat_id,
"name": chat_name,
Expand All @@ -65,6 +70,7 @@ def test_telegram_multiple_platform_not_available(self):
"updatedAt": datetime.now(),
},
{
"name": "telegram",
"metadata": {
"id": chat_id2,
"name": chat_name2,
Expand All @@ -75,6 +81,7 @@ def test_telegram_multiple_platform_not_available(self):
"updatedAt": datetime.now(),
},
{
"name": "telegram",
"metadata": {
"id": chat_id3,
"name": chat_name3,
Expand Down
4 changes: 2 additions & 2 deletions dags/hivemind_telegram_etl.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,13 @@
) as dag:

@task
def fetch_chat_ids() -> list[tuple[str, str]]:
def fetch_chat_ids() -> list[tuple[int, str]]:
"""
Getting all Telegram chats from the database
Returns
---------
chat_infos : list[tuple[str, str]]
chat_infos : list[tuple[int, str]]
a list of Telegram chat id and name
"""
load_dotenv()
Expand Down

0 comments on commit 5eeb2c4

Please sign in to comment.