以京东商品评论为目标网站,架构采用爬虫+Flume+Kafka+Spark Streaming+Mysql,实现数据动态实时的采集、分析、展示数据。
#-*- codeing =utf-8 -*- #@Time : 2022/3/16 20:15 #@Author : huaobin #@File : new.py #@Software: PyCharm # -*- coding: utf-8 -*- import gzip import urllib.request import json import time import random import demjson as dj import requests import itertools headers = { "Cookie": "",
"Accept": "*/*", "Accept-Encoding": "gzip, deflate, br", "Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8,en-GB;q=0.7,en-US;q=0.6", "Connection": "keep-alive", "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/99.0.4844.51 Safari/537.36 Edg/99.0.1150.39" } headers2 = { "accept": "*/*", "accept-encoding": "gzip, deflate, br", "accept-language": "zh-CN,zh;q=0.9,en;q=0.8,en-GB;q=0.7,en-US;q=0.6", "cookie": ""
"referer": "https://search.jd.com/" } def crawlProductComment(url): # 读取原始数据(注意选择gbk编码方式) try: req = requests.get(url=url, headers=headers2).text reqs = req.replace("fetchJSON_comment98(", "").strip(');') print(reqs) jsondata = json.loads(reqs) # 遍历商品评论列表 comments = jsondata['comments'] return comments except IOError: print("Error: gbk不合适") # 从原始数据中提取出JSON格式数据(分别以'{'和'}'作为开始和结束标志) def getProduct(url): ids = [] req = requests.get(url=url, headers=headers2).text reqs = req.replace("jQuery1544821(", "").strip(')') jsondata = json.loads(reqs)['291'] for i in range(0, len(jsondata)): ids.append(jsondata[i]['sku_id']) print(ids) return ids import paramiko #服务器信息,主机名(IP地址)、端口号、用户名及密码 hostname = "hadoop102" port = 22 username = "root" password = "123456" client = paramiko.SSHClient() client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) client.connect(hostname, port, username, password, compress=True) sftp_client = client.open_sftp() remote_file = sftp_client.open("/opt/a.log",'a+') # 文件路径自己设置 ids = [] for i in range(2,3): product_id = getProduct( "https://search-x.jd.com/Search?callback=jQuery1544821&area=5&enc=utf-8&keyword=%E7%94%B7%E5%A3%AB%E8%BF%90%E5%8A%A8%E9%9E%8B&adType=7&page="+str(i)+"&ad_ids=291%3A33&xtest=new_search&_=1647325621019") time.sleep(random.randint(1, 3)) ids.append(product_id) data = [] count = 0 for k in list(set(itertools.chain.from_iterable(ids))): for i in range(0, 100): url = 'https://club.jd.com/comment/productPageComments.action?callback=fetchJSON_comment98&productId=' + str( k) + '&score=0&sortType=5&page=' \ + str(i) + '&pageSize=10&isShadowSku=0&fold=1' comments = crawlProductComment(url) if len(comments) <= 0: break print(comments) remote_file.writelines(str(len(comments))+"\n") data.extend(comments) # 设置休眠时间 time.sleep(random.randint(1, 5)) print('-------', i) print("这是第{}类商品".format(count)) count += 1
python爬取京东商品并向虚拟机上传
flume编写conf文件配置agent(连接kafka)
a1.sources=r1
a1.sinks=k1 k2
a1.channels=c1 c2
#define source
a1.sources.r1.type=exec
a1.sources.r1.channels=c1 c2
a1.sources.r1.command=tail -F /training/testData/test-flume.log//监控的目录(爬去信息不断上传的目录)
a1.sources.r1.shell=/bin/sh -c
a1.sources.r1.selector.type=replicating
#sink1toKafka
a1.sinks.k1.type =org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.topic = test02//监听的kafka的topic(在idea代码中会自动创建)
a1.sinks.k1.brokerList= 192.168.63.139:9092,192.168.63.140:9092,192.168.63.134:9092//非集群模式可以只设主机ip地址(这里的中文在运行时可能导致报错,把中文注释删除即可)
a1.sinks.k1.kafka.bootstrap.servers = 192.168.63.139:9092,192.168.63.140:9092,192.168.63.134:9092//这里也是一样可以只设主机ip地址
a1.sinks.k1.producer.requiredAcks = 1
a1.sinks.k1.batchSize = 5
a1.sinks.k1.channel=c1
#sink2toHDFS
a1.sinks.k2.type=hdfs
a1.sinks.k2.channel=c2
a1.sinks.k2.hdfs.path=hdfs://192.168.63.139:9000/DataTest/datatest/test02//hdfs上,你想输出文件的目录
#channel1
a1.channels.c1.type=memory
#channel2
a1.channels.c2.type=memory
flume启动命令:
bin/flume-ng agent --name a1 --conf conf --conf-file conf/flume.conf -Dflume.root.logger=INFO,console
idea部分编写接受数据并存储到Mysql中
package com.niit.spark import java.sql.{Connection, DriverManager, PreparedStatement} import org.apache.kafka.common.serialization.StringDeserializer import org.apache.spark.SparkConf import org.apache.spark.streaming.dstream.DStream import org.apache.spark.streaming.kafka010.KafkaUtils import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent object BigWork{ def main(args:Array[String]): Unit = { val sparkConf = new SparkConf().setAppName("BigWork").setMaster("local[2]") val streamingContext = new StreamingContext(sparkConf, Seconds(2)) val kafkaParams = Map[String, Object]( "bootstrap.servers" -> "flume:9092",//修个成自己的虚拟机主机 "key.deserializer" -> classOf[StringDeserializer], "value.deserializer" -> classOf[StringDeserializer], "group.id" -> "use_a_separate_group_id_for_each_stream", "auto.offset.reset" -> "latest", "enable.auto.commit" -> (false: java.lang.Boolean) ) val topics = Array("test02", "t100")//kafka监听目录 val stream = KafkaUtils.createDirectStream[String, String]( //监听 streamingContext, PreferConsistent, Subscribe[String, String](topics, kafkaParams) ) val mapDStream: DStream[(String, String)] = stream.map(record => (record.key, record.value)) val resultRDD: DStream[(String, Int)] = mapDStream.map(_._2.split(",")(2)).map(word=>(word,1)).reduceByKey(_+_)// 处理数据 resultRDD.print()//打印 //转化成rdd,连接并存储到myql中 resultRDD.foreachRDD(rdd => { def func(records: Iterator[(String,Int)]) { var conn: Connection = null var stmt: PreparedStatement = null try { val url = "jdbc:mysql://localhost:3306/bigjob?useUnicode=true&characterEncoding=UTF-8" val user = "root" val password = "123456" conn = DriverManager.getConnection(url, user, password) records.foreach(p => { val sql = "insert into bigwork(name,count) values (?,?)" stmt = conn.prepareStatement(sql); stmt.setString(1, p._1.trim) stmt.setInt(2,p._2.toInt) stmt.executeUpdate() }) } catch { case e: Exception => e.printStackTrace() } finally { if (stmt != null) { stmt.close() } if (conn != null) { conn.close() } } } val repartitionedRDD = rdd.repartition(3) repartitionedRDD.foreachPartition(func) }) streamingContext.start() streamingContext.awaitTermination() } }
最后在数据库中读取数据可视化展示即可