该篇文章是基于Flink的数据开发类文章,作为该系列的第一篇文章,主要希望大家可以通过本篇文章了解基于Flink实时数据处理的设计和开发。
对于Flink的基本原理可以参照Flink官方网站:https://flink.apache.org 。
基于Flink 1.11的网络流量实时解析,主要针对基于Pcap的原始网络流量数据进行TCP/UDP/ICMP的协议数据实时解析,并将解析数据装成数据帧Frame,以便进行实时网络流量分析。
为完成以上功能,需要了解Pcap数据解析、TCP/UDP层协议解析、Flink的序列化和反序列化、Flink自定义函数以及基于Stream sql的Flink实时数据分析。
1、Pcap数据解析
要进行基于Pcap格式的网络流量数据解析,就必须了解Pcap文件格式定义:
如上所示,标准Pcap数据由Pcap文件头、数据桢Frame头、数据桢Frame组成。
在Pcap文件头中:Magic :0x1A2B3C 4D,用于表示Pcap数据的开始;Major:用于标示Pcap数据主版本号;Minor:用于标示Pcap数据次版本号;ThisZone:本地标准时间;SigFigs: 时间戳精度;SnapLen:最大的存储长度;LinkType:链路类型。
在数据桢头中:Timestamp1:时间戳高位,精确到S;Timestamp2:时间戳低位,精确到ms;CapLen:当前数据桢长度;
Len:网络中实际数据桢的长度。
注意:目前LinkType链路类型,支持EN10MB、RAW、LOOP、LINUX_SLI;通过以上基本结构,在Pcap文件头中,我们获取最有用的信息即时LinkType,后面我们需要根据不同的LinkType类型,进行数据桢Frame的解析。
除此之外,根据数据桢头,可以获得数据桢的封装时间;
这里根据以太网数据桢类型为例:也就是Ipv4、Ipv6、ARP数据桢,如上图所示,该类型的数据桢数据部分的偏移是14。如果是Ipv4或者Ipv6的协议类型,可以解析获取Mac地址。接下来,其实就是解析TCP/IP层的协议。
2、TCP/UDP协议解析
(1)、TCP协议
// 获取TCP头大小
tcpOrUdpHeaderSize = getTcpHeaderLength(packetData, ipStart + ipHeaderLen);
packet.put(Packet.TCP_HEADER_LENGTH, tcpOrUdpHeaderSize);
// Store the sequence and acknowledgement numbers --M
// 获取TCP 请求序列号
packet.put(Packet.TCP_SEQ, PcapReaderUtil.convertUnsignedInt(packetData, ipStart + ipHeaderLen + PROTOCOL_HEADER_TCP_SEQ_OFFSET));
// 获取TCP 确认序列号
packet.put(Packet.TCP_ACK, PcapReaderUtil.convertUnsignedInt(packetData, ipStart + ipHeaderLen + PROTOCOL_HEADER_TCP_ACK_OFFSET));
// Flags stretch two bytes starting at the TCP header offset
int flags = PcapReaderUtil.convertShort(new byte[] { packetData[ipStart + ipHeaderLen + TCP_HEADER_DATA_OFFSET],
packetData[ipStart + ipHeaderLen + TCP_HEADER_DATA_OFFSET + 1] })
& 0x1FF; // Filter first 7 bits. First 4 are the data offset and the other 3 reserved for future use.
packet.put(Packet.TCP_FLAG_NS, (flags & 0x100) == 0 ? false : true);
packet.put(Packet.TCP_FLAG_CWR, (flags & 0x80) == 0 ? false : true);
packet.put(Packet.TCP_FLAG_ECE, (flags & 0x40) == 0 ? false : true);
packet.put(Packet.TCP_FLAG_URG, (flags & 0x20) == 0 ? false : true);
packet.put(Packet.TCP_FLAG_ACK, (flags & 0x10) == 0 ? false : true);
packet.put(Packet.TCP_FLAG_PSH, (flags & 0x8) == 0 ? false : true);
packet.put(Packet.TCP_FLAG_RST, (flags & 0x4) == 0 ? false : true);
packet.put(Packet.TCP_FLAG_SYN, (flags & 0x2) == 0 ? false : true);
packet.put(Packet.TCP_FLAG_FIN, (flags & 0x1) == 0 ? false : true);
2、UDP协议
tcpOrUdpHeaderSize = UDP_HEADER_SIZE;
if (ipProtocolHeaderVersion == 4) {
int cksum = getUdpChecksum(packetData, ipStart, ipHeaderLen);
if (cksum >= 0)
packet.put(Packet.UDP_SUM, cksum);
}
int udpLen = getUdpLength(packetData, ipStart, ipHeaderLen);
packet.put(Packet.UDP_LENGTH, udpLen);
3、Kafka的序列化和反序列化
基于分布式消息队列Kafka作为网络流量数据的中间临时缓存,通过FlinkKafkaConsumer进行网络流数据的解析,这里我们自定义了PcapResover的解析器,使用自定义的解序列化函数PcapDataDeSerializer。
Kafka Producer,负责转发已采集的网络流量,这里配置使用了Kafka内部的序列化类
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
this.consumer = new FlinkKafkaConsumer<>(this.topic,(KafkaDeserializationSchema)
new PcapDataDeSerializer(Object.class),props);
public class PcapDataDeSerializer implements KafkaDeserializationSchema<Object> {
private static final Logger log= LoggerFactory.getLogger(PcapDataDeSerializer.class);
private static final long serialVersionUID = 1L;
private Class<Object> clazz;
public PcapDataDeSerializer(Class<Object> clazz) {
this.clazz=clazz;
}
List<Packet> packetList = new ArrayList<>();
@Override
public boolean isEndOfStream(Object nextElement) {
return false;
}
@Override
public Object deserialize(ConsumerRecord<byte[], byte[]> record) throws IOException {
DataInputView in = new DataInputViewStreamWrapper(new ByteArrayInputStream(record.value()));
DataInputStream dataInputStream=new DataInputStream((InputStream) in);
PcapReader reader = new PcapReader(dataInputStream);
for (Packet packet : reader) {
packetList.add(packet);
}
log.info("finish deserialize pcap data ,"+record.key()+" , topic is "+record.topic()+", "+
"partition is "+record.partition()+" , "+" offset is " +record.offset());
return JSON.toJSON(packetList);
}
@Override
public TypeInformation<Object> getProducedType() {
return TypeExtractor.getForClass(this.clazz);
}
}
PcapDataDeSerializer主要实现KafkaDeserializationSchema<Object>中的deserialze即可,在这个函数中,会解析网络流量,并解析的网络流量封装成Pcaket List对象中,进行返回。
KafkaConsumer的创建使用自定义解序列化函数,主要是为了根据1、2 部分对于Pcap网络流量格式的分析,解析网络流量,并封装成数据桢。
4、Flink自定义函数
基于以上创建的FlinkKafkaConsumer,可以配置Flink Stream DAG,DataStreamSouce ->flatMap->Map->Stream<Frame>
DataStreamSource<Object> stream =
executionEnvironment.addSource(this.consumer);
log.info("start to build pcap dataStream DAG graph , transform packet into frame stream, " +
"and default parallelism is 4 !");
return stream.flatMap(new FrameFlatMap()).
map(new FrameMapFunction()).setParallelism(4);
这里其实返回的是DataStream<Frame>,也就是说,我们将原始网络流量解析,最后按照数据桢的方式输出数据流,以便与进行数据分析。接下来,为了基于Stream sql做一些数据分析,其实就可以将DataStream注册成临时表视图,然后使用类sql的语法进行实时分析了。
5、Flink实时分析示例
聚合统计10s的窗口内,目的mac地址的计数。当然这里sql的表达方式很多,而且表达能力足够强大。可以根据不同的业务诉求,进行不同的分析。
aggregationSql = "select dstMac,count(1) as c from " + KafkaProperties.FRAME_VIEW_NAME +
" group by tumble(PROCTIME() ,interval '10' SECOND) " +
", dstMac";
之后就是进行sink了,完成DAG 构建完成,Excute提交任务到集群。
Table result = streamTableEnvironment.sqlQuery(sql);
DataStream<Row> resultData = streamTableEnvironment.toAppendStream(result, Row.class);
resultData.print();
总结一下,基本流程如下图所示:
主要通过配置FlinkKafkaConsumer,实现PcapDataDesrializer负责对Pcap数据包中的Frame进行反序列化处理和解析,形式基于Frame的流数据,之后通过自定义FlatMapFunction、MapFunction函数对流数据进行处理和封装成为原始派生流DataStream<Frame>。
来源:freebuf.com 2020-10-26 13:45:43 by: 龙渊实验室LongYuanLab
请登录后发表评论
注册