-
Notifications
You must be signed in to change notification settings - Fork 0
/
init_db.py
201 lines (172 loc) · 7.1 KB
/
init_db.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
import psycopg2
from private import db_details
from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT
def create_database_and_tables():
subreddits = ['cryptocurrency']
con = psycopg2.connect(**db_details)
# this or CREATE DATABASE cannot run inside a transaction block
con.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT)
with con.cursor() as cur:
# create reddit db
# will store all tables inside this db
cur.execute(
'CREATE DATABASE "reddit"'
)
# what the hell, how do you create db and then
# create table inside that db? The only way to
# make a new connection? WTF?
con = psycopg2.connect(**db_details, dbname='reddit')
con.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT)
with con.cursor() as cur:
# create main tables
# main table contains all submissions/comments
cols = """
_id VARCHAR PRIMARY KEY,
created_utc INT,
title VARCHAR,
body VARCHAR,
selftext VARCHAR
"""
for subreddit in subreddits:
cur.execute(
f'DROP TABLE IF EXISTS {subreddit};'
f'CREATE TABLE IF NOT EXISTS {subreddit} ({cols})'
)
# create topic tables
# topic table contains selected entries
# from corresponding subreddit main table.
# Entries are selected if it contains a topic.
# These tables are optimised for speed.
# These will be queried by external APIs.
# IMPORTANT!
# instead of unix timestamp lets create postgresql
# timestamp col so that we do not need to do this
# conversion on every query (speed tests show this
# cuts time from 450 to 280 ms for 10 mln rows)
con = psycopg2.connect(**db_details, dbname='reddit')
con.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT)
with con.cursor() as cur:
# Let's set the id to unique identifier so that
# when updating topic values we do not add new
# records everytime we do the op over table with
# already present records.
cols = """
_id VARCHAR PRIMARY KEY,
created_utc INT,
is_comment BOOLEAN,
topic VARCHAR
"""
for subreddit in [sub + '_' for sub in subreddits]:
cur.execute(
f'DROP TABLE IF EXISTS {subreddit};'
f'CREATE TABLE IF NOT EXISTS {subreddit} ({cols})'
)
def create_cron_jobs():
cron_jobs = [
{
'name': 'hourly_data',
'aggregation_helper': 'hour',
'lookback_time_in_hours': 24,
'cron_scedule': '5 * * * *' # every hour at 1 minutes (at 1 minute, mentions are updated)
},
{
'name': 'daily_data',
'aggregation_helper': 'day',
'lookback_time_in_hours': 24 * 24,
'cron_scedule': '5 1 * * *' # every day at 1 hour 5 min
},
]
# Inside the reddit database:
# 1. Create functions to be run by cron inside the reddit database
con = psycopg2.connect(**db_details, dbname='reddit')
con.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT)
with con.cursor() as cur:
for job in cron_jobs:
# The query is split into two parts (there's a marked break in the middle).
# The first part gathers the coins and corresponding mentions - it's simple.
# The second part is because there are time gaps in timeseries.
# Filling the time gaps to create proper time-series gets quite expensive.
# Therefore, lets do it only once per hour using the query below.
query = f"""
DROP TABLE IF EXISTS {job['name']};
CREATE TABLE {job['name']} AS
WITH last_period AS (
SELECT (
DATE_TRUNC('{job['aggregation_helper']}', CURRENT_TIMESTAMP AT TIME ZONE 'UTC')
- INTERVAL '0 {job['aggregation_helper']}'
)::timestamp AS last_timestamp
),
data_ AS (
SELECT * FROM cryptocurrency_
WHERE
created_utc <= EXTRACT(EPOCH FROM (SELECT last_timestamp FROM last_period))
AND created_utc > EXTRACT(EPOCH FROM (SELECT last_timestamp FROM last_period))
- ({job['lookback_time_in_hours']} * 60 * 60)
),
data_with_time_gaps AS (
SELECT
topic,
DATE_TRUNC('{job['aggregation_helper']}', TIMESTAMP 'epoch' + created_utc * INTERVAL '1 second') AS gran,
COUNT(*) AS count
FROM data_
GROUP BY gran, topic
),
-----------------------
distinct_topics AS (
SELECT DISTINCT topic FROM data_with_time_gaps
),
distinct_grans AS (
SELECT generate_series(min(gran), max(gran), '1 {job['aggregation_helper']}'::interval) AS gran
FROM data_with_time_gaps
),
topic_gran_combinations AS (
SELECT dt.topic, dg.gran
FROM distinct_topics dt, distinct_grans dg
)
SELECT tgc.topic, tgc.gran, COALESCE(yt.count, 0) AS count
FROM topic_gran_combinations tgc
LEFT JOIN data_with_time_gaps yt ON tgc.topic = yt.topic AND tgc.gran = yt.gran
ORDER BY tgc.topic, tgc.gran;
"""
cur.execute(
f"""
CREATE OR REPLACE FUNCTION {job['name']}()
RETURNS VOID AS
$$
BEGIN
{query}
END;
$$
LANGUAGE plpgsql;
"""
)
# Inside the postgres database:
# 2. Activate cron extensions, remove all cron jobs (if any)
# 3. Create cron jobs
con = psycopg2.connect(**db_details, dbname='postgres')
con.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT)
with con.cursor() as cur:
cur.execute(
"""
CREATE EXTENSION IF NOT EXISTS pg_cron;
TRUNCATE TABLE cron.job;
TRUNCATE TABLE cron.job_run_details;
"""
)
for job in cron_jobs:
cur.execute(
f"""
SELECT cron.schedule_in_database(
'{job['name']}',
'{job['cron_scedule']}',
'SELECT {job['name']}()',
'reddit'
);
"""
)
if __name__ == '__main__':
# Be careful, running this will erase the db tables with data.
# Commenting it out to make sure we don't run in accidentally
# create_database_and_tables()
# create cron jobs
create_cron_jobs()