The full text has
787 words, the expected learning time is 0 minutes
Source: Pexels
Overview
- Flow data is a concept that flourishes in the field of machine learning html
- Learn how to use PySpark to predict streaming data using machine learning models
- We will introduce streaming data and Spark The basics of Streaming, then dive into the implementation of some
Introduction
Imagine - every second there are more than 8,500 tweets posted, more than 900 photos uploaded to Instagram, more than 4,200 Skype calls, more than 78,000 Google searches, and more than 2 million emails sent (data from InternetLive Stats).
We are producing data at unprecedented speed and scale. This is a great time to work in the field of data science! But with a large amount of data, complicated challenges follow one after another.
first, how to collect large-scale data? How do you ensure that once data is generated and collected, the machine learning pipeline continues to produce results? These are major challenges facing the industry and why the concept of streaming data is gaining more and more attention in enterprises.
Increased ability to process streaming data will greatly expand the current data science product portfolio. This is a skill that the industry needs and mastering it will help you take on the next data science role.
So in this article we will learn what stream data is, understand the basics of Spark Streaming, and then implement streaming data using spark on an industry-related dataset.
directory
- . What is stream data?
- . Basic knowledge of Spark Streaming . Discrete stream . Cache . Checkpoint
What is stream data?
social media generated data is amazing. Do you dare to imagine what it takes to store all your data? This is a complicated process! Therefore, before delving into the Spark aspect of this article, let’s understand what stream data is.
stream data has no discrete start or end. This data is generated from thousands of data sources per second and they need to be processed and analyzed as quickly as possible. Large amounts of streaming data need to be processed in real time, such as Google search results.
We know that some insights will be more valuable when an event first occurs, and they will gradually lose value over time. Take sports events as an example – we want to see real-time analysis, real-time statistical insights, and really enjoy the game in that moment, right?
For example, let's say you're watching an exciting tennis match between Roger Federer (Roger Federer) vs. Novak Djokovic.
This game tied in two innings, and you want to know the percentage of his backhand serve compared to Federer's professional average. Should I see it meaningful in a few days, or the moment before the decisive game begins?
Source: Pexels
Spark Streaming Basics
Spark Streaming is an extension of the core Spark API, which can realize scalable and fault-tolerant stream processing of real-time data streams.
Before you go to the implementation section, let’s first understand the different components of Spark Streaming.
Discrete Stream
Discrete Stream (Dstream) is a continuous data stream. For discrete streams, their data streams can be received directly from the data source or after some processing of the original data.
The first step in building a streaming application is to define the batch duration of the data resources to collect data from.If the batch duration is 2 seconds, data will be collected every 2 seconds and stored in the RDD. These continuous sequence chains of RDDs are a DStream, which is immutable and can be used as a distributed dataset through Spark.
consider a typical data science project. In the data preprocessing phase, we need to convert variables, including converting categorical variables into numeric variables, creating binning, removing outliers and many other things. Spark keeps history of all transformations defined on the data. Therefore, whenever a failure occurs, it can trace the converted path and regenerate the calculation results.
We want the Spark application to run continuously for 7 x 24 hours. And whenever a failure occurs, we hope it can recover as soon as possible. However, while processing data at a large scale, Spark needs to recalculate all transformations in case of failure. As you can imagine, the cost of doing so can be very expensive.
cache
This is a way to deal with this challenge. We can temporarily store calculated (cache) results to maintain the results of the transformation defined on the data. In this way, when a failure occurs, there is no need to recalculate these transformations again and again.
DStreams allows streaming data to be kept in memory. This is useful when we want to perform multiple operations on the same data.
checkpoint
cache is very useful when it is used normally, but it requires a lot of memory. Not everyone has hundreds of computers with 128 GB of memory to cache everything. The concept of
checkpoint can be helpful.
checkpoint is another technique to preserve the results of the converted dataframe. It will save the state of the running application from time to time on any reliable storage medium (such as HDFS). However, it is slower than cache and has less flexibility.
can use checkpoints when owning stream data. The conversion result depends on the previous conversion result and needs to be saved for use. In addition, we also store checkpoint metadata information, such as configurations used to create stream data and the results of a series of DStream operations.
Share variable for stream data
Sometimes functions such as map, reduce or filter are defined for Spark applications that must be executed on multiple clusters. The variables used in the function are copied to each machine (cluster).
In this case, each cluster has a different executor and we want something that can give the relationship between these variables.
For example: Suppose Spark applications run on 100 different clusters, they capture Instagram images posted by people from different countries.
Now, each cluster's executor will calculate the results of the data on that particular cluster. But we need something to help these clusters communicate in order to get aggregated results. In Spark, we have shared variables that allow this problem to be overcome.
accumulator variable
use cases include the number of errors occurred, the number of blank logs, the number of requests we receive from a specific country - all of which can be resolved using an accumulator.
Executors on each cluster send data back to the driver process to update the value of the accumulator variable. The accumulator is only suitable for associative and interchangeable operations. For example, it is useful for summing and maximizing, but average value does not work.
Broadcast variable
When we use location data (such as mappings of city names and postal codes), these are fixed variables, right? Now, if each specific transformation on any cluster requires this type of data, we don't need to send a request to the driver, as it would be too expensive.
html
Instead, a copy of this data can be stored on each cluster.These types of variables are called broadcast variables.
broadcast variable allows programmers to keep a read-only variable on each computer. Typically, Spark automatically assigns broadcast variables using efficient broadcast algorithms, but if there are tasks that require the same data for multiple stages, they can be defined as well.
Use PySpark for sentiment analysis of streaming data
It's time to start your favorite IDE! Let's encode in this section and understand streaming data in a practical way.
Understanding Problem Statement
In this section we will use the real dataset. Our goal is to detect hate speech in tweets. For simplicity, if a tweet contains remarks with racist or sexist sentiment, we think that tweet contains hate speech.
So the task is to distinguish racist or sexist tweets from other tweets. We will use a training sample containing tweets and tags, where tag "1" means tweets are racist/sexist, and tag "0" means other types.
Source: TechCrunch
Why is this a topic-related project? Because social media platforms receive huge streaming data in the form of comments and status updates. This project will help us review publicly published content.
Set the project workflow
- . Model construction: build a logistic regression model pipeline to classify whether the tweet contains hate speech. Here, our focus is not to build a completely accurate classification model, but to understand how to use arbitrary models on streaming data and return the result
- . Initialize Spark Streaming's environment: Once the model is built, you need to define the host name and port number to get the streaming data . Streaming data: Next, add a tweet from the netcat server from the defined port, and the SparkStrreaming API will receive the data after the specified duration . Predict and return the result: Once a tweet is received, the data is passed into the created machine learning pipeline and the predicted emotions are returned from the model
This is a concise description of the workflow:
Training data to build a logistic regression model
We store the tweet data and its corresponding tags in a csv file. Logistic regression model is used to predict whether tweets contain hate speech. If yes, the model predicts the label to be 1 (otherwise it is 0). You can refer to "PySpark for Beginners" to set up your Spark environment.
can download the dataset and code here.
First, you need to define the schema of the CSV file. Otherwise, Spark will treat each column of data as a string.Read data and check whether the pattern meets the definition:
- # importing required librarys
- from pyspark import SparkContexttml15
- from pyspark.sql.session import SparkSession
- from pyspark.streaming import StreamingContexttml15
- from pyspark.streaming import StreamingContexttml15
- import pyspark.sql.types as tp
- from pyspark.ml import Pipeline
- from pyspark.ml.feature import StringIndexer, OneHotEncoderEstimator, VectorAssembler
- from pyspark.ml.feature import StopWordsRemover, Word2Vec, RegexTokenizer
- from pyspark.ml.classification import LogisticRegression
- from pyspark.sql import Row
- # initializing spark session
- sc = SparkContext(appName="PySparkShell")
- spark = SparkSession(sc)
- # define the schema
- my_schema = tp.StructType([
- tp.StructField(name='id', dataType= tp.IntegerType(), nullable=True),
- tp.StructField(name='label', dataType= tp.IntegerType(), nullable=True),
- tp.StructField(name='tweet', dataType= tp.StringType(), nullable=True)
- ])
- # read the datasettml15
- my_data = spark.read.csv('twitter_sentiments.csv',
- schema=my_schema,
- header=True)
- # view the data
- my_data.show(5)
- # print the schema of the file
- my_data.printSchema()
defines the various stages of the machine learning pipeline
Now the data has been saved in the Spark data frame, and different stages of the conversion data need to be defined, and then use it to get the predicted tags from the model.
In the first stage, use RegexTokenizer to convert the Twitter text into a word list. Then, remove the stop words from the word list and create a word vector. In the final stage, these word vectors are used to construct logistic regression models and obtain predicted sentiments.
Remember - the point is not to build a completely accurate classification model, but to see how to use prediction models on streaming data to get results.
- # define stage 1: tokenize the tweet text
- stage_1 = RegexTokenizer(inputCol='tweet', outputCol='tokens', pattern='\\W')
- # define stage 2: remove the stop words
- stage_2 = StopWordsRemover(inputCol='tokens', outputCol='filtered_words')
- # define stage 3: create a word vector of the size 100
- stage_3 = Word2Vec(inputCol='filtered_words', outputCol='vector', vectorSize=100)
- # define stage 4: Logistic Regression Model
- model = LogisticRegression(featuresCol='vector', labelCol='label')
Set up machine learning pipeline
Let's add stages in the Pipeline object and then perform these transformations in order.Fit the pipeline with the training dataset. Now, whenever there is a new tweet, you only need to pass it to the pipeline object and convert the data to get the prediction:
- # setup the pipeline
- pipeline = Pipeline(stages= [stage_1, stage_2, stage_3, model])
- # fit the pipeline model with the training data
- pipelineFit = pipeline.fit(my_data)
streaming data and return results
Assuming hundreds of comments are received per second, we want to keep the platform clean by preventing users from posting hate speech. So whenever we receive a new text, we pass it into the pipeline and get the predicted sentiment.
We will define a function get_prediction that will delete the blank sentence and create a data frame where each line contains a tweet.
Initializes the Spark Streaming environment and defines a batch duration of 3 seconds. This means we will predict the data we receive every 3 seconds:
- # define a function to compute sentiments of the received tweets
- defget_prediction(tweet_text):
- try:
- # filter the tweets whose length is greater than 0
- tweet_text = tweet_text.filter(lambda x: len(x) 0)
- # create a dataframe with column name 'tweet' and each row will contain the tweet
- rowRdd = tweet_text.map(lambda w: Row(tweet=w))
- # create a spark dataframe
- wordsDataFrame = spark.createDataFrame(rowRdd)
- # transform the data using the pipeline and get the predicted sentiment
- pipelineFit.transform(wordsDataFrame).select('tweet','prediction').show()
- except:
- print('No data')
- # initialize the streaming contexttml15
- ssc = StreamingContext(sc, batchDuration=3)
- # Create a DStream that will connect to hostname:port, like localhost:9991
- lines = ssc.socketTextStream(sys.argv[1], int(sys.argv[2]))
- # split the tweet text by a keyword 'TWEET_APP' so that we can identify which set of words is from a single tweet
- words = lines.flatMap(lambda line : line.split('TWEET_APP'))
- # get the predicted sentiments for the tweets received
- words.foreachRDD(get_prediction)
- # Start the computing
- ssc.start()
- # Wait for the computing to terminate
- ssc.awaitTermination()
Run the program on a terminal and use Netcat (a utility for sending data to a defined host name and port number). You can start the TCP connection with the following command:
- nc -lk port_number
Finally, type the text in the second terminal and you will get the prediction in real time in another terminal.
perfect!
Conclusion
Streaming data will only become more and more popular in the next few years, so you should really start to get familiar with this topic. Remember, data science is not just about building models—the entire process needs attention.
This article introduces the basics of SparkStreaming and how to implement it on a real dataset. I encourage you to use another dataset or grab real-time data to implement what you just introduced (you can try other models as well).
Looking forward to listening to your feedback and ideas on this article in the comment section below.
Leave a message Like Follow
Let's share the information on AI learning and development
If reprinted, please leave a message in the background and abide by the reprinting specifications