This repository contains Apache Flink programs that produce, consume and process traffic data. The producer (FlinkFcdProducer.java) reads the records from a gzipped file in the file system and write them in a Kafka topic in binary format (Avro). The consumer (FlinkFcdConsumer.java) reads the records from the Kafka topic, separates them into cells within a bounding box, computes the number of records within each cell in time windows and prints them to stdout The MapMatch program (FlinkMapMatch.java, work in progress) reads the records from a Kafka topic, separates the records computing their geohash and then matches the coordinates pairs to the road segments. Finally program computes the number of vehicles and the average speed in each road segment within a time window. The result of the computation for each road segment, and for all the time interval, is sent to a sink (TBD, Elasticsearch or Cassandra).
The MapMatch program depends on PostGis with the road network data pre loaded, R for the map matching algorithm and Rserver for the communication. All these modules are included in pilot-sc4-postgis. Start the pilot-sc4-postgis docker container before running the MapMatch program.
The software is based on Maven and can be built from the project root folder running the command
$ mvn install
This component can be run as a Java application passing some arguments to select the Flink application and further parameters.
In order to start a producer run the following command
$ java -jar target/pilot-sc4-fcd-applications-0.10.0-jar-with-dependencies.jar producer -path <path_to_the_gzipped_file> -topic <a kafka topic>
The job can also be started from the Flink JobManager, see the Flink JobManager Quick Star Setup to learn how to do it. Once Flink is started you can submit a job uploading the project jar file and setting the following parameters
Entry Class: eu.bde.pilot.sc4.fcd.FlinkFcdProducer
Program Arguments: --path <path_to_the_gzipped_file> --topic <a kafka topic>
In order to start a consumer run the following command
$ java -jar target/pilot-sc4-fcd-applications-0.10.0-jar-with-dependencies.jar consumer-elasticsearch -topic <a kafka topic> -window <minutess>
This job can also be started from the Flink JobManager using the same jar file (you don't have to upload it again) and setting the following parameters
Entry Class: eu.bde.pilot.sc4.fcd.FlinkFcdConsumerElasticsearch
Program Arguments: --topic <a kafka topic> --window <minutes>
In order to start the map-match run the following command
$ java -jar target/pilot-sc4-fcd-applications-0.10.0-jar-with-dependencies.jar mapmatch -topic <a kafka topic> -window <minutes>
You can submit the MapMatch job to the Flink Job manager setting the following parameters
Entry Class: eu.bde.pilot.sc4.fcd.FlinkMapMatch
Program Arguments: mapmatch --topic <a kafka topic> --window <minutes>
Before submitting a 2nd or more jobs to the Task Manager, be sure to set the number of task slots equal to the number of jobs you want the Task Manager to run. From your Flink installation root folder open conf/flink-conf.yaml and set
taskmanager.numberOfTaskSlots: 2
to run the producer and the consumer in local.
The HUE service may not accept uplaod of files bigger than defined maximum size. This is cause
by the configuration of the image nginx-proxy-with-css
(csswrapper) which forwards request and
has a maximum size set for request bodies.
In order to overcome this problem
the nginx-proxy-with-css
is derived into the image bde2020/pilot-sc4-csswrapper
and updated by copying a modified nginx.conf
file,
config/nginx.conf
where the maximal size is set to 300M by setting the option client_max_body_size
:
http {
...
client_max_body_size 300M;
...
}
The derived image is defined in
sc4-csswrapper.dockerfile
and built by
docker-compose -f stack.yml build csswrapper
If bigger files are to be uploaded by HUE, change the client_max_body_size
and build again.
##Licence Apache 2.0