本地运行PySpark处理wasbs blobstorage数据 作者: nbboy 时间: 2022-06-05 分类: Python 评论 ### azure wasbs协议 wasbs文件协议是azure blobstorage,spark需要安装一些jar包才能支持该协议,并不是在自带的core中。这里简单记录一下安装过程和遇到的一些坑。 ### 安装Jar包 在安装好后的spark home目录的jars下放入jar包: azure-storage-8.6.6.jar azure-storage-blob-11.0.1.jar hadoop-azure-3.2.0.jar 在运行的时候报错说Jetty没有安装,于是索性下载了一些Jetty包,请注意版本,因为我用的是jdk8,所以下载的是低版本。 jetty-http-9.3.24.v20180605.jar jetty-server-9.3.24.v20180605.jar jetty-util-9.3.24.v20180605.jar jetty-util-ajax-9.3.24.v20180605.jar ### 连接Pyspark 接下来做的事让Pyspark找到Spark,接着再设置SparkSession。根据电脑的配置设置,如果Driver内存设置的太小,跑的过程中就会爆出内存不够的错误,这是我的配置: ```python import findspark findspark.init() print(findspark.find()) import pyspark from pyspark.sql import SparkSession #Create SparkSession spark = SparkSession.builder \ .appName("HQV Model") \ .config("spark.executor.memory", "4g") \ .config("spark.driver.memory", "4g")\ .config("spark.dirver.maxResultSize", "4g")\ .master("local[*]") \ .config("spark.executor.cores", "4") \ .config("spark.default.parallelism", "4") \ .getOrCreate() ```
房源推荐项目总结 作者: nbboy 时间: 2020-03-16 分类: Python 评论 # 接口的定义 先定义一个采样类Sample,主要负责收集采样的信息。 ```python class Sample(metaclass=abc.ABCMeta): """ 采样类 """ # 样本名称 name = '' # 样本类型 type = 0 # 决策者 manager = None def __init__(self, userId): self._userId = userId self._lazy_data = [] @abc.abstractmethod def load(self): """ :parameter userId 加载数据 :return: """ pass @property def items(self): if not self._lazy_data: self._lazy_data = self.load() return self._lazy_data @property def length(self): """ 数据长度 :return: """ return len(self.items) @property def weight(self): """ 数据权重 :return: """ pass @property def workQuota(self): """ 工作配额 :return: """ return int(self.manager.getWorkQuota(self)) ``` 具体的样本种类根据业务的数据来了,比如我的业务涉及到用户推荐房源有关的就用户点单的数据,用户收藏的数据,用户足迹的数据,每个类都可以实例化一个类。 另外一个关键类是SamplePolicyManager,主要负责采样信息的管理和计算采样结果的配额,它的定义如下: ```python class SamplePolicyManager(collections.UserList): """ 采样管理 """ def append(self, item): item.manager = self return super(SamplePolicyManager, self).append(item) def insert(self, i, item): item.manager = self super(SamplePolicyManager, self).insert(i, item) def getWorkQuota(self, sample): """ 根据权重和总数量计算配额 :param sample: :return: """ return sample.weight * TOTAL_RECOMMEND_NUM ``` 因为后面算法需要用到分类,所有我们把这部分概念抽象出来,方便后面灵活添加,创建一个分类器的接口IClassifierInterface。 ```python class IClassifierInterface(metaclass=abc.ABCMeta): """ 特征分类器 """ # 分类数量 CLASSIFY_NUM = 0 # 特征的权重,权重值参与最后的相似度计算,权重约高,影响越大 WEIGHT = 0 NAME = '' @abc.abstractmethod def classify(self, attrs, data): """ 计算分类数据 :parameter attrs 需要分类的数据列 :parameter data 分类好的数据 :return:返回data """ pass @abc.abstractmethod def isDirty(self, attrs): """ 是否脏数据 :param attrs: :return: """ pass def getVectors(self, infos): """ 根据用户的喜好计算出偏好向量 :param infos: :return: """ count = [0] * self.classifyNum vectors = [] for item in infos: count = self.classify(item, count) lenInfo = float(len(infos)) for item in count: vectors.append(item / lenInfo) return vectors @property def classifyNum(self): return self.CLASSIFY_NUM ``` 其实我实际的项目中,分类器有很多,比如价格分类器,面积分类器,户型分类器等等,根据自己的项目需求灵活增加即可。 比如价格分类器,代码如下: ```python class PriceClassifier(IClassifierInterface): """ 价格分类器 """ CLASSIFY_NUM = 13 WEIGHT = 0.2 NAME = 'price' TEN_THOUSAND_RMB = 10000 def isDirty(self, attrs): """ 非法价格,减少曝光 :param attrs: :return: """ return attrs.price is None or attrs.price == 0 def classify(self, attr, data): if attr.price <= 50 * self.TEN_THOUSAND_RMB: data[0] = data[0] + 1 elif 50 * self.TEN_THOUSAND_RMB < attr.price <= 100 * self.TEN_THOUSAND_RMB: data[1] = data[1] + 1 elif 100 * self.TEN_THOUSAND_RMB < attr.price <= 150 * self.TEN_THOUSAND_RMB: data[2] = data[2] + 1 elif 150 * self.TEN_THOUSAND_RMB < attr.price <= 200 * self.TEN_THOUSAND_RMB: data[3] = data[3] + 1 elif 200 * self.TEN_THOUSAND_RMB < attr.price <= 250 * self.TEN_THOUSAND_RMB: data[4] = data[4] + 1 elif 250 * self.TEN_THOUSAND_RMB < attr.price <= 300 * self.TEN_THOUSAND_RMB: data[5] = data[5] + 1 elif 300 * self.TEN_THOUSAND_RMB < attr.price <= 350 * self.TEN_THOUSAND_RMB: data[6] = data[6] + 1 elif 350 * self.TEN_THOUSAND_RMB < attr.price <= 400 * self.TEN_THOUSAND_RMB: data[7] = data[7] + 1 elif 400 * self.TEN_THOUSAND_RMB < attr.price <= 450 * self.TEN_THOUSAND_RMB: data[8] = data[8] + 1 elif 450 * self.TEN_THOUSAND_RMB < attr.price <= 500 * self.TEN_THOUSAND_RMB: data[9] = data[9] + 1 elif 500 * self.TEN_THOUSAND_RMB < attr.price <= 550 * self.TEN_THOUSAND_RMB: data[10] = data[10] + 1 elif 550 * self.TEN_THOUSAND_RMB < attr.price <= 600 * self.TEN_THOUSAND_RMB: data[11] = data[11] + 1 else: data[12] = data[12] + 1 return data ``` 咣咣,请我们的猪脚登场,看具体算法前,我们先看下推荐类的接口定义: ```python class IRecommendInterface(metaclass=abc.ABCMeta): """ 推荐接口 """ houses = None def addClassifier(self, classifier): """ :param classifier: :return: """ self._classifiers.append(classifier) return self def removeClassifier(self, classifier): """ :param classifier: :return: """ self._classifiers.remove(classifier) def __init__(self, houses, aggregation=None): super().__init__() self.houses = houses self._aggregation = aggregation # 分类器集合,分类器需要实现IClassifierInterface self._classifiers = [] self._samples = [] def recommend(self): """ 推荐计算 :param userInfo: :return: """ if self._aggregation is None: raise AssertionError() for sample, loadArgs in self._samples: if sample.length > 0: similarWeight = self._recommend(sample, sample.items) similarWeight = sorted(similarWeight, key=lambda item: item['score'], reverse=True)[sample.workQuota:] self._aggregation.aggregate(sample, similarWeight) return self._aggregation @abc.abstractmethod def _recommend(self, sample, infos): pass def addSample(self, sample, loadArgs=None): loadArgs = loadArgs or [] self._samples.append((sample, loadArgs)) return self @property def aggregation(self): return self._aggregation @aggregation.setter def aggregation(self, value): self._aggregation = value ``` 可以看到后面有一个聚合的方法,为什么需要这个聚合类?原因很简单,因为需要对推荐的数据做再一次的处理。看下聚合接口: ```python class IAggregationRoot(metaclass=abc.ABCMeta): """ 结果聚合处理 """ @abc.abstractmethod def aggregate(self, sample, vectors): pass def __init__(self): self._value = [] def _afterProcess(self, value): return value @property def value(self): return self._afterProcess(self._value) ``` 非常简单,只是做结果的收集,在看推荐过程前,我们先写下测试用例看看,有了上面的类定义,我们用例代码这样写: ```python # 初始化采样决策 policyManager = SamplePolicyManager() userOrderSample = UserOrderSample(userId) # 添加用户点单采样 policyManager.append(userOrderSample) userCollectSample = UserCollectSample(userId) # 添加用户收藏采样 policyManager.append(userCollectSample) userFootSample = UserFootSample(userId) # 添加用户足迹采样 policyManager.append(userFootSample) if not userOrderSample.items and not userCollectSample.items and not userFootSample.items: return # 初始化推荐引擎 cbRecommend = CBRecommend(houses) cbRecommend.aggregation = LimitScoreOrderAggregation(5) cbRecommend \ .addClassifier(PriceClassifier()) \ .addClassifier(SpaceClassifier()) \ .addClassifier(FloorClassifier()) \ .addClassifier(LayoutClassifier()) \ .addClassifier(CommunityClassifier(cityId)) # 获取到推荐相似度向量 similarWeight = cbRecommend \ .addSample(userOrderSample) \ .addSample(userCollectSample) \ .addSample(userFootSample) \ .recommend() print(similarWeight) return similarWeight ``` # 关键类实现 是不是很清晰了?最后只剩下实现推荐接口的CBRecommend实现类没讲解,这个类其实就是正正工作的地方,也就是推荐算法的逻辑,当然我们只要实现接口,完全可以把它换掉。他是这样的: ```python class CBRecommend(IRecommendInterface): """ 基于内容推荐引擎 """ def _cosineSimilarity(self, vectorA, vectorB, lenVectorA): ''' 函数功能:计算两向量的余弦相似度 参数:向量vector_A,vector_B 返回值:两向量的余弦相似度 ''' for i in range(len(vectorB)): if vectorB[i] == 1: index = i break vectorInner = vectorA[index] vectorCos = vectorInner / (lenVectorA) return vectorCos def _recommend(self, sample, infos): similarWeight = [] cachedUserInfos = {} for item in self.houses: weightCos = 0 classifierInfos = [] for classifier in self._classifiers: # 是不是脏数据? if not classifier.isDirty(item): # 这里只是做一个缓存,避免多次计算 if classifier.NAME not in cachedUserInfos: cachedUserInfos[classifier.NAME] = classifier.getVectors(infos) vectors = cachedUserInfos[classifier.NAME] # 分配初始化空军 count = [0] * classifier.classifyNum # 得到分类矩阵 count = classifier.classify(item, count) vectorA = np.sqrt(np.inner(vectors, vectors)) # 获得余弦相似度 sim = self._cosineSimilarity(vectors, count, vectorA) else: sim = -9999 classifierInfos.append({'classifier': classifier, 'score': round(sim * classifier.WEIGHT, 8)}) # 计算总得分 weightCos += sim * classifier.WEIGHT similarWeight.append( {'itemId': item.id, 'item': item, 'score': round(weightCos, 8), 'sample': sample, 'classifiers': classifierInfos}) return similarWeight def __init__(self, houses, aggregation=None): super().__init__(houses, aggregation) ``` 注释我都加上了,每个用户对每套房源都有一个得分,得分最高的推荐给用户就可以了。因为涉及到公司代码,业务相关的代码,我就不展示了,思路就是这个思路,用的推荐算法是基于内容的推荐算法。 # 总结 这个推荐算法当然是一个非常简单的推荐算法,但是效果也还可以,因为是基于用户内容的,所以基本上看过去的数据都是用户想要的。后面,我还会对推荐算法进行讲解,当然不是很专业,也不是我的本职工作,只是业余研究研究。
用scrapy抓取qunar门票信息 作者: nbboy 时间: 2019-11-20 分类: Python 评论 # 概述 水文一篇,本来想把scrapy做成一个系列,不过最近还是比较忙,等有空的时候,就工作中遇到的反爬机制说一下。这次说的是爬取qunar网站的景点信息,并且存储到mongodb中。在说一遍,python做爬虫是真的方便,指的是爬虫的前端部分。 # 准备工作 - 创建项目 ```python scrapy startproject ticket_spiders ``` - 创建爬虫 ```python scrapy genspider qunar qunar.com ``` # 制定爬取规则 通过分析网页我们找到了爬取qunar景区的入口接口,http://piao.qunar.com/ticket/list.json?keyword=浙江®ion&page=151&from=mps_search_suggest ,这里为了做演示,只爬取浙江的景区。起始URL填写为到start_urls中,因为内容返回为JSON的,所以解析步骤相对来说简单多了,直接解析为JSON对象就可以。在做了简单的判断之后,我们把抽取到的数据发往pipeline就可以了。 ```python result = json.loads(response.text) if 'ret' in result and result['ret'] == True: data = result['data'] oldKeyword = data['keyword'] if oldKeyword == '浙江': sightList = data['sightList'] if sightList: for sight in sightList: yield QunarItem(**sight) ``` # 分页规则 爬虫的分页规则每个爬虫都不一样,针对我们今天的爬虫而言,我们只要让他一页一页翻下去就可以了。parse函数里的代码: ```python def parse(self, response): page = response.meta.get('page', 151) self.logger.info('current page {page}'.format(page=page)) result = json.loads(response.text) if 'ret' in result and result['ret'] == True: data = result['data'] oldKeyword = data['keyword'] if oldKeyword == '浙江': sightList = data['sightList'] if sightList: for sight in sightList: yield QunarItem(**sight) nextPage = page + 1 yield scrapy.Request( url='http://piao.qunar.com/ticket/list.json?keyword=%E6%B5%99%E6%B1%9F®ion&from=mps_search_suggest&page=' + str( nextPage), callback=self.parse, meta={'page': nextPage}) else: self.logger.error('被限制爬行了') ``` # 制定处理规则 到了这步,我们已经有数据了,接下来该清洗的清洗,该处理的处理,该存储的存储,这次爬虫目标很简单,就是把数据全部存储起来就可以。 ```python import pymongo mongoClint = pymongo.MongoClient('127.0.0.1', 27017) dbQunar = mongoClint['qunar'] collection = dbQunar['sight'] class QunarPipeline(object): def process_item(self, item, spider): collection.insert_one(item._values) return item ``` # 全局配置 上面那步写好了处理器,但是并没有进行挂接,这步我们进行挂接,在settings.py文件里,更改处理器配置参数如下: ```python ITEM_PIPELINES = { 'ticket_spiders.pipelines.QunarPipeline': 300, } ``` # 运行和结果 ```shell scrapy crawl qunar ``` 执行后就可以得到最终抓取的结果,嗯,挺有满足感的,下面可以用这些数据进行分析或者展示了。  # 总结 scrapy作为抓取引擎既简单又强大,简单是说他其实封装了很多复杂的机制和概念,下次我带领大家从源码角度去了解这个框架。学习一门技术要从根本上,特别是源码角度或者是核心模型角度去了解其实现,才能从瞬息万变的技术变革中以不变应万变,以此和读者共勉。