Skip to content

Commit

Permalink
feat: 数据采集全链路脚本补充
Browse files Browse the repository at this point in the history
  • Loading branch information
TomShiDi committed Mar 20, 2024
1 parent f5daa46 commit 5c6c39e
Show file tree
Hide file tree
Showing 5 changed files with 233 additions and 0 deletions.
54 changes: 54 additions & 0 deletions daq-cluster.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
#!/bin/bash

case $1 in
"start"){
echo ================== 启动 集群 ==================

#启动 Zookeeper集群
zk-cluster.sh start

#启动 Hadoop集群
hadoop-cluster.sh start

#启动 Kafka采集集群
kafka.sh start

#启动采集 Flume
f1.sh start

#启动日志消费 Flume
f2.sh start

#启动业务消费 Flume
f3.sh start

#启动 maxwell
maxwell.sh start

};;
"stop"){
echo ================== 停止 集群 ==================

#停止 Maxwell
maxwell.sh stop

#停止 业务消费Flume
f3.sh stop

#停止 日志消费Flume
f2.sh stop

#停止 日志采集Flume
f1.sh stop

#停止 Kafka采集集群
kafka.sh stop

#停止 Hadoop集群
hadoop-cluster.sh stop

#停止 Zookeeper集群
zk-cluster.sh stop

};;
esac
13 changes: 13 additions & 0 deletions f3.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
#!/bin/bash

case $1 in
"start")
echo " --------启动 node3 业务数据flume-------"
ssh node3 "nohup /opt/server/flume-1.9.0/bin/flume-ng agent -n a1 -c /opt/server/flume-1.9.0/conf -f /opt/server/flume-1.9.0/job/kafka_to_hdfs_db.conf >/dev/null 2>&1 &"
;;
"stop")

echo " --------停止 node3 业务数据flume-------"
ssh node3 "ps -ef | grep kafka_to_hdfs_db.conf | grep -v grep |awk '{print \$2}' | xargs -n1 kill"
;;
esac
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package com.tomshidi.flume.interceptor;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;

import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Map;

/**
* 将数据中的表名和时间戳放到header中
* @author TomShiDi
* @since 2024/3/20 13:39
*/
public class TimestampAndTableNameInterceptor implements Interceptor {

@Override
public void initialize() {

}

@Override
public Event intercept(Event event) {
String bodyStr = new String(event.getBody(), StandardCharsets.UTF_8);
JSONObject jsonObject = JSON.parseObject(bodyStr);
String table = jsonObject.getString("table");
long ts = jsonObject.getLong("ts") * 1000;
Map<String, String> headers = event.getHeaders();
headers.put("table", table);
headers.put("timestamp", Long.toString(ts));
return event;
}

@Override
public List<Event> intercept(List<Event> events) {
for (Event event : events) {
intercept(event);
}
return events;
}

@Override
public void close() {

}

public static class Builder implements Interceptor.Builder{
@Override
public Interceptor build() {
return new TimestampAndTableNameInterceptor();
}

@Override
public void configure(Context context) {

}
}
}
39 changes: 39 additions & 0 deletions flume-demo/src/main/resources/kafka_to_hdfs_db.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
# agent
a1.sources=r1
a1.sinks=k1
a1.channels=c1

# source
a1.sources.r1.type=org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.channels=c1
a1.sources.r1.batSize=5000
a1.sources.r1.batchDurationMillis=2000
a1.sources.r1.kafka.bootstrap.servers=node1:9092,node2:9092
a1.sources.r1.kafka.topics=topic_db
a1.sources.r1.kafka.consumer.group.id=flume
a1.sources.r1.interceptors=i1
a1.sources.r1.interceptors.i1.type=com.tomshidi.flume.interceptor.TimestampAndTableNameInterceptor$Builder

# channel
a1.channels.c1.type=file
a1.channels.c1.checkpointDir=/opt/server/flume/checkpoint/behavior2
a1.channels.c1.dataDirs=/opt/server/flume/data/behavior2
a1.channels.c1.maxFileSize=2146435071
a1.channels.c1.capacity=1000000
a1.channels.c1.keep-alive=6

# sink
a1.sinks.k1.type=hdfs
a1.sinks.k1.hdfs.path=/origin_data/gmall/db/%{table}_inc/%Y-%m-%d
a1.sinks.k1.hdfs.filePrefix=db-
a1.sinks.k1.hdfs.round=false
a1.sinks.k1.hdfs.rollInterval=10
a1.sinks.k1.hdfs.rollSize=134217728
a1.sinks.k1.hdfs.rollCount=0

a1.sinks.k1.hdfs.fileType=CompressedStream
a1.sinks.k1.hdfs.codeC=gzip

# bind
a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1
66 changes: 66 additions & 0 deletions mysql_to_kafka_inc_init.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
#!/bin/bash

# 该脚本的作用是初始化所有的增量表,只需执行一次

MAXWELL_HOME=/opt/server/maxwell

import_data() {
$MAXWELL_HOME/bin/maxwell-bootstrap --database gmall --table $1 --config $MAXWELL_HOME/config.properties
}

case $1 in
"cart_info")
import_data cart_info
;;
"comment_info")
import_data comment_info
;;
"coupon_use")
import_data coupon_use
;;
"favor_info")
import_data favor_info
;;
"order_detail")
import_data order_detail
;;
"order_detail_activity")
import_data order_detail_activity
;;
"order_detail_coupon")
import_data order_detail_coupon
;;
"order_info")
import_data order_info
;;
"order_refund_info")
import_data order_refund_info
;;
"order_status_log")
import_data order_status_log
;;
"payment_info")
import_data payment_info
;;
"refund_payment")
import_data refund_payment
;;
"user_info")
import_data user_info
;;
"all")
import_data cart_info
import_data comment_info
import_data coupon_use
import_data favor_info
import_data order_detail
import_data order_detail_activity
import_data order_detail_coupon
import_data order_info
import_data order_refund_info
import_data order_status_log
import_data payment_info
import_data refund_payment
import_data user_info
;;
esac

0 comments on commit 5c6c39e

Please sign in to comment.