全文共6787字,預計學習時長20分鐘
來源:Pexels
概述
- 流數據是一個在機器學習領域蓬勃發展的概念
- 學習如何使用PySpark來利用機器學習模型對流數據進行預測
- 我們將介紹流數據和Spark Streaming的基礎知識,然後深入到實現部分
引言
想像一下——每一秒都有8,500多條推文發布,900多張照片被上傳到Instagram,4,200多個Skype呼叫,78,000多次Google搜索,以及200多萬封電子郵件被發送(數據來自InternetLive Stats)。
我們正在以前所未有的速度和規模生產數據。這是在數據科學領域工作的大好時候!但是有了大量的數據後,接踵而至的是複雜的挑戰。
首要,如何收集大規模的數據?如何確保一旦生成並收集數據,機器學習管道就會繼續產生結果?這些都是業界面臨的重大挑戰,以及為什麼流數據的概念在企業中越來越受到關注。
增加處理流數據的能力將極大地擴展當前的數據科學產品投資組合。這是業界急需的技能,若能熟練掌握它,將幫助你擔負起下一個數據科學角色。
因此,在本文中,我們將學習什麼是流數據,了解Spark Streaming的基礎知識,然後在一個業界相關的數據集上使用spark實現流數據。
目錄
1. 什麼是流數據?
2. Spark Streaming的基礎知識
3. 離散流
4. 緩存
5. 檢查點
6. 流數據的共享變數
7. 累加器變數
8. 廣播變數
9. 使用PySpark對流數據進行情感分析
什麼是流數據?
社交媒體產生的數據是驚人的。你敢於想像存儲所有數據需要些什麼嗎?這是一個複雜的過程!因此,在深入探討本文的Spark方面之前,先來理解什麼是流數據。
流數據沒有離散的開始或結束。這些數據是每秒從數千個數據源中生成的,它們需要儘快進行處理和分析。大量流數據需要實時處理,例如Google搜索結果。
我們知道,在事件剛發生時一些見解會更有價值,而隨著時間的流逝它們會逐漸失去價值。以體育賽事為例——我們希望看到即時分析,即時統計見解,在那一刻真正享受比賽,對吧?
例如,假設你正在觀看一場羅傑·費德勒(Roger Federer)對戰諾瓦克·喬科維奇(Novak Djokovic)的激動人心的網球比賽。
這場比賽兩局打平,你想了解與費德勒的職業平均水平相比,其反手發球的百分比。是在幾天之後看到有意義,還是在決勝局開始前的那一刻看到有意義呢?
來源:Pexels
Spark Streaming的基礎知識
Spark Streaming是核心Spark API的擴展,可實現實時數據流的可伸縮和容錯流處理。
在轉到實現部分之前,先了解一下Spark Streaming的不同組成部分。
離散流
離散流(Dstream)是一個連續的數據流。對於離散流,其數據流可以直接從數據源接收,也可以在對原始數據進行一些處理後接收。
構建流應用程序的第一步是定義要從中收集數據的數據資源的批處理持續時間。如果批處理持續時間為2秒,則將每2秒收集一次數據並將其存儲在RDD中。這些RDD的連續序列鏈是一個DStream,它是不可變的,可以通過Spark用作一個分散式數據集。
考慮一個典型的數據科學項目。在數據預處理階段,我們需要轉換變數,包括將分類變數轉換為數字變數,創建分箱,去除異常值和很多其他的事。Spark保留了在數據上定義的所有轉換的歷史記錄。因此,無論何時發生故障,它都可以追溯轉換的路徑並重新生成計算結果。
我們希望Spark應用程序7 x 24小時持續運行。並且每當故障發生時,我們都希望它能儘快恢復。但是,在大規模處理數據的同時,Spark需要重新計算所有轉換以防出現故障。可以想像,這樣做的代價可能會非常昂貴。
緩存
這是應對該挑戰的一種方法。我們可以暫時存儲已計算(緩存)的結果,以維護在數據上定義的轉換的結果。這樣,當發生故障時,就不必一次又一次地重新計算這些轉換。
DStreams允許將流數據保留在內存中。當我們要對同一數據執行多種運算時,這很有用。
檢查點
高速緩存在正常使用時非常有用,但是它需要大量內存。並不是每個人都有數百台具有128 GB內存的計算機來緩存所有內容。
檢查點的概念能夠有所幫助。
檢查點是另一種保留轉換後的數據框結果的技術。它將不時地將正在運行的應用程序的狀態保存在任何可靠的存儲介質(如HDFS)上。但是,它比緩存慢,靈活性也更差。
在擁有流數據時可以使用檢查點。轉換結果取決於先前的轉換結果,並且需要保存以供使用。此外,我們還存儲檢查點元數據信息,例如用於創建流數據的配置以及一系列DStream操作的結果等。
流數據的共享變數
有時候需要為必須在多個集群上執行的Spark應用程序定義諸如map,reduce或filter之類的函數。在函數中使用的變數會被複制到每台機器(集群)中。
在這種情況下,每個集群都有一個不同的執行器,我們想要一些可以賦予這些變數之間關係的東西。
例如:假設Spark應用程序在100個不同的集群上運行,它們捕獲了來自不同國家的人發布的Instagram圖片。
現在,每個集群的執行者將計算該特定集群上的數據的結果。但是我們需要一些幫助這些集群進行交流的東西,以便獲得匯總結果。在Spark中,我們擁有共享變數,這些變數使此問題得以克服。
累加器變數
用例包括發生錯誤的次數,空白日誌的數量,我們從特定國家收到請求的次數——所有這些都可以使用累加器解決。
每個集群上的執行程序將數據發送回驅動程序進程,以更新累加器變數的值。 累加器僅適用於關聯和可交換的運算。例如,對求和和求最大值有用,而求平均值不起作用。
廣播變數
當我們使用位置數據(例如城市名稱和郵政編碼的映射)時,這些是固定變數,是吧?現在,如果每次在任意集群上的特定轉換都需要這種類型的數據,我們不需要向驅動程序發送請求,因為它會太昂貴。
相反,可以在每個集群上存儲此數據的副本。這些類型的變數稱為廣播變數。
廣播變數允許程序員在每台計算機上保留一個只讀變數。通常,Spark使用高效的廣播演算法自動分配廣播變數,但是如果有任務需要多個階段的相同數據,也可以定義它們。
使用PySpark對流數據進行情感分析
是時候啟動你最喜歡的IDE了!讓我們在本節中進行編碼,並以實踐的方式理解流數據。
理解問題陳述
在本節我們將使用真實數據集。我們的目標是檢測推文中的仇恨言論。為了簡單起見,如果一條推文包含帶有種族主義或性別歧視情緒的言論,我們就認為該推文包含仇恨言論。
因此,任務是將種族主義或性別歧視的推文從其他推文中區分出來。我們將使用包含推文和標籤的訓練樣本,其中標籤「1」表示推文是種族主義/性別歧視的,標籤「0」則表示其他種類。
來源:TechCrunch
為什麼這是一個與主題相關的項目?因為社交媒體平台以評論和狀態更新的形式接收龐大的流數據。該項目將幫助我們審核公開發布的內容。
設置項目工作流程
1. 模型構建:構建邏輯回歸模型管道,對推文中是否包含仇恨言論進行分類。在這裡,我們的重點不是建立一個完全準確的分類模型,而是了解如何在流數據上使用任意模型並返回結果
2. 初始化Spark Streaming的環境:一旦模型構建完成,需要定義獲取流數據的主機名和埠號
3. 流數據:接下來,從定義的埠添加來自netcat伺服器的推文,SparkStreaming API將在指定的持續時間後接收數據
4. 預測並返回結果:一旦接收到推文,就將數據傳遞到創建的機器學習管道中,並從模型中返回預測的情緒
這是對工作流程的簡潔說明:
訓練數據以建立邏輯回歸模型
我們在一個csv文件中存儲推文數據及其相應的標籤。使用邏輯回歸模型來預測推文是否包含仇恨言論。如果是,則模型預測標籤為1(否則為0)。你可以參考「面向初學者的PySpark」來設置Spark環境。
可以在這裡下載數據集和代碼。
首先,需要定義CSV文件的模式。否則,Spark會將每列數據的類型都視為字元串。讀取數據並檢查模式是否符合定義:
- # importing required libraries
- from pyspark import SparkContext
- from pyspark.sql.session import SparkSession
- from pyspark.streaming import StreamingContext
- import pyspark.sql.types as tp
- from pyspark.ml import Pipeline
- from pyspark.ml.feature import StringIndexer, OneHotEncoderEstimator, VectorAssembler
- from pyspark.ml.feature import StopWordsRemover, Word2Vec, RegexTokenizer
- from pyspark.ml.classification import LogisticRegression
- from pyspark.sql import Row
- # initializing spark session
- sc = SparkContext(appName="PySparkShell")
- spark = SparkSession(sc)
- # define the schema
- my_schema = tp.StructType([
- tp.StructField(name='id', dataType= tp.IntegerType(), nullable=True),
- tp.StructField(name='label', dataType= tp.IntegerType(), nullable=True),
- tp.StructField(name='tweet', dataType= tp.StringType(), nullable=True)
- ])
- # read the dataset
- my_data = spark.read.csv('twitter_sentiments.csv',
- schema=my_schema,
- header=True)
- # view the data
- my_data.show(5)
- # print the schema of the file
- my_data.printSchema()
定義機器學習管道的各個階段
現在已經將數據保存在Spark數據框中,需要定義轉換數據的不同階段,然後使用它從模型中獲取預測的標籤。
在第一階段,使用RegexTokenizer將推特文本轉換為單詞列表。然後,從單詞列表中刪除停用詞並創建詞向量。在最後階段,使用這些詞向量來構建邏輯回歸模型並獲得預測的情緒。
記住——重點不是建立一個完全準確的分類模型,而是要看看如何在流數據上使用預測模型來獲取結果。
- # define stage 1: tokenize the tweet text
- stage_1 = RegexTokenizer(inputCol='tweet' , outputCol='tokens', pattern='\\W')
- # define stage 2: remove the stop words
- stage_2 = StopWordsRemover(inputCol='tokens', outputCol='filtered_words')
- # define stage 3: create a word vector of the size 100
- stage_3 = Word2Vec(inputCol='filtered_words', outputCol='vector', vectorSize=100)
- # define stage 4: Logistic Regression Model
- model = LogisticRegression(featuresCol='vector', labelCol='label')
設置機器學習管道
讓我們在Pipeline對象中添加階段,然後按順序執行這些轉換。用訓練數據集擬合管道,現在,每當有了新的推文,只需要將其傳遞給管道對象並轉換數據即可獲取預測:
- # setup the pipeline
- pipeline = Pipeline(stages= [stage_1, stage_2, stage_3, model])
- # fit the pipeline model with the training data
- pipelineFit = pipeline.fit(my_data)
流數據和返回結果
假設每秒收到數百條評論,我們希望通過阻止用戶發布仇恨言論來保持平台整潔。因此,每當我們收到新文本,都會將其傳遞到管道中並獲得預測的情緒。
我們將定義一個函數get_prediction,該函數將刪除空白句子並創建一個數據框,其中每一行都包含一條推文。
初始化Spark Streaming的環境並定義3秒的批處理持續時間。這意味著我們將對每3秒收到的數據進行預測:
- # define a function to compute sentiments of the received tweets
- defget_prediction(tweet_text):
- try:
- # filter the tweets whose length is greater than 0
- tweet_text = tweet_text.filter(lambda x: len(x) >0)
- # create a dataframe with column name 'tweet' and each row will contain the tweet
- rowRdd = tweet_text.map(lambda w: Row(tweet=w))
- # create a spark dataframe
- wordsDataFrame = spark.createDataFrame(rowRdd)
- # transform the data using the pipeline and get the predicted sentiment
- pipelineFit.transform(wordsDataFrame).select('tweet','prediction').show()
- except :
- print('No data')
- # initialize the streaming context
- ssc = StreamingContext(sc, batchDuration=3)
- # Create a DStream that will connect to hostname:port, like localhost:9991
- lines = ssc.socketTextStream(sys.argv[1], int(sys.argv[2]))
- # split the tweet text by a keyword 'TWEET_APP' so that we can identify which set of words is from a single tweet
- words = lines.flatMap(lambda line : line.split('TWEET_APP'))
- # get the predicted sentiments for the tweets received
- words.foreachRDD(get_prediction)
- # Start the computation
- ssc.start()
- # Wait for the computation to terminate
- ssc.awaitTermination()
在一個終端上運行該程序,然後使用Netcat(用於將數據發送到定義的主機名和埠號的實用工具)。你可以使用以下命令啟動TCP連接:
- nc -lk port_number
最後,在第二個終端中鍵入文本,你將在另一個終端中實時獲得預測。
完美!
結語
流數據在未來幾年只會越來越熱門,因此應該真正開始熟悉這一主題。請記住,數據科學不只是建立模型——整個流程都需要關注。
本文介紹了SparkStreaming的基礎知識以及如何在真實的數據集上實現它。我鼓勵大家使用另一個數據集或抓取實時數據來實現剛剛介紹的內容(你也可以嘗試其他模型)。
期待在下面的評論區聽取你對本文的反饋以及想法。
留言點贊關注
我們一起分享AI學習與發展的乾貨
如轉載,請後台留言,遵守轉載規範