This is the source code for project 6.
contains all the source code, andsrc/.../types
contains the types we need (eg.CustomTuple2
classes) for the different queriessrc/.../
contains code to read from a data file and injects messages to Kafka ingress. This program is run within theproducer
: contains theUndertow
server that listens for requestssrc/.../
: contains the query code for processing in-edges query, in-k-hop query, and in-triangle querysrc/.../
: contains the query code for processing out-edges query, out-k-hop query, and out-triangle querysrc/.../
: contains the query code for the time window query. See API for more details.src/.../EventsFilterFn
: contains the code of our main event handler function, which receives all requests and sends each request to the appropriate query functionlatencyTest/
: This folder contains the code to generate latency graph and the graphs
: This folder contains the data files
Time Window Query
- required parameters:
for vertex to query on,t
for starting timestamp,endTime
for ending timestamp - this query outputs all outgoing edges from source node
between timet
In-Edges Query
task type:GET_IN_EDGES
- required parameters:
for vertex to query on,t
for timestamp (currently not used, but required) - this query retrieves all incoming edges of vertex
Out-Edges Query
task type:GET_OUT_EDGES
- required parameters:
for vertex to query on,t
for timestamp (currently not used, but required) - this query retrieves all outgoing edges of vertex
IN-K-HOP Query
task type:IN_K_HOP
- required parameters:
for vertex to query on,k
for the number of hops - this query retrieves the neighbor that is k hops away from node
by traversing incoming edges
Out-K-HOP Query
task type:OUT_K_HOP
- required parameters:
for vertex to query on,k
for the number of hops - this query retrieves the neighbor that is k hops away from node
by traversing outgoing edges
IN-Triangles Query
task type:IN_TRIANGLES
- required parameters:
for vertex to query on - this query retrieves the nodes within a unidirectional triangle of node
by traversing incoming edges
OUT-Triangles Query
- required parameters:
for vertex to query on - this query retrieves the nodes within a unidirectional triangle of node
by traversing outgoing edges
Recommendation Query
- required parameters:
for vertex to query on,t
for timestamp (currently not used, but required) - this query retrieves the potential recommendation candidates for node
based on outgoing connections of an incoming neighbor of nodedst
- from the root directory of the source code, run
cd projectCode
to go into the actual source directory (if you are already inside theprojectCode
directory, you can skip this step) - run
to build and run the stateful functions - open
Docker Desktop
and clickgraph-analytics
to see messages being sent and received - If you prefer reading logs produced by each container in the terminal, run
make kafka-terminal
Important Note: The zookeeper and Kafka broker container are using arm64 architecture. If you don't have Docker desktop, you might not be able to run it on your local machine, depending on the architecture of your machine. Try using the amd64 version of these containers.
We currently have two ingresses, one of them takes HTTP
requests as input events, the other one takes Kafka
messages as
input events. Therefore, we can send events/queries via CURL
commands or a Kafka
All executable events are of the execute
type and follow the following JSON
{"task": <executable task>, "src": <src vertexid>, "dst": <dst vertexid>, "t": <timestamp>, "endTime": <endtime for time window query>, "k": <number of hops of k hop query>}
Not all of the fields are needed. For example, endTime
and k
are specified for specific queries, so you don't need to specify all
the fields when sending events. Check the specific query API for required fields.
The supported executable tasks are:
To send queries via curl command, this is the template to us:
curl -X PUT -H "Content-Type: application/vnd.graph-analytics.types/execute" -d <execute JSON> localhost:8090/graph-analytics.fns/filter/1
# this CURL command will fetch all incoming edges for source vertex 1 at timestamp 123001
curl -X PUT -H "Content-Type: application/vnd.graph-analytics.types/execute" -d {"task": GET_IN_EDGES, "src": 1, "t": 123001} localhost:8090/graph-analytics.fns/filter/1
# this CURL command will fetch all outgoing edges for source vertex 1 BETWEEN 123001 <= t <= 125001
curl -X PUT -H "Content-Type: application/vnd.graph-analytics.types/execute" -d {"task": GET_TIME_WINDOW_EDGES), "src": 1, "t": 123001, "endTime": 125001} localhost:8090/graph-analytics.fns/filter/1
The Kafka is set up according to this guide, which is set up through docker-compose
; therefore, by running docker-compose
, it will automatically set up the broker. After docker-compose up -d
, topics have to be created since at the moment, automatic topics creation during start up is not set up yet. Run the follow commands to manually create topics:
docker exec broker \
kafka-topics --bootstrap-server broker:29092 \
--create \
--topic tasks
If you have kafka installed on your device, then you can enter this instead:
kafka-topics --bootstrap-server localhost:9092 \
--create \
--topic tasks
To write to Kafka:
To write message to Kafka topic, we'll done it through kafka-console-producer command line tool. This is good for testing it out but during simulation, we'll be using Kafka Connect/Producer API to read textfiles to send graph edges to Flink Application. For sending single message, we can write the following in the terminal:
docker exec --interactive --tty broker \
kafka-console-producer --bootstrap-server broker:29092 \
--topic tasks \
--property parse.key=true \
--property key.separator="|"
If you have kafka installed on your device, then you can enter this instead:
kafka-console-producer --bootstrap-server localhost:9092 \
--topic tasks \
--property parse.key=true \
--property key.separator="|"
Then you can write messages:
3|{"task": "ADD", "src": "3", "dst": "4", "t": "1254194656", "k": "0"}
1|{"task": "ADD", "src": "1", "dst": "4", "t": "1254192988", "k": "0"}
To read messages from Kafka on terminal:
To read messages from a "quickstart" topic:
docker exec --interactive --tty broker \
kafka-console-consumer --bootstrap-server broker:29092 \
--topic tasks \
If you have kafka installed on your device, then you can enter this instead:
kafka-console-consumer --bootstrap-server localhost:9092 \
--topic tasks \
To list the topics in Kafka:
docker exec broker \
kafka-topics --bootstrap-server broker:29092 \
If you have kafka installed on your device, then you can enter this instead:
kafka-topics --bootstrap-server localhost:9092 \
To request for metadata from the broker inside the docker:
docker run -it --rm --network=projectcode_default edenhill/kcat:1.7.1 -b broker:29092 -L