The full text has a total of 6787 words, and the estimated learning time is 20 minutes Source: Pexels Overview Stream Data is a concept that flourishes in the field of machine learning Learn how to use PySpark to predict streaming data using machine learning models We will introd

The full text has 787 words, the expected learning time is 0 minutes

Source: Pexels

Overview

  • Flow data is a concept that flourishes in the field of machine learning html
  • Learn how to use PySpark to predict streaming data using machine learning models
  • We will introduce streaming data and Spark The basics of Streaming, then dive into the implementation of some

Introduction

Imagine - every second there are more than 8,500 tweets posted, more than 900 photos uploaded to Instagram, more than 4,200 Skype calls, more than 78,000 Google searches, and more than 2 million emails sent (data from InternetLive Stats).

We are producing data at unprecedented speed and scale. This is a great time to work in the field of data science! But with a large amount of data, complicated challenges follow one after another.

first, how to collect large-scale data? How do you ensure that once data is generated and collected, the machine learning pipeline continues to produce results? These are major challenges facing the industry and why the concept of streaming data is gaining more and more attention in enterprises.

Increased ability to process streaming data will greatly expand the current data science product portfolio. This is a skill that the industry needs and mastering it will help you take on the next data science role.

So in this article we will learn what stream data is, understand the basics of Spark Streaming, and then implement streaming data using spark on an industry-related dataset.

directory

    . What is stream data?

  • . Basic knowledge of Spark Streaming

    . Discrete stream

    . Cache

  • . Checkpoint

. Shared variable

. Accumulator variable

. Broadcast variable

. Use PySpark to perform sentiment analysis of stream data

What is stream data?

social media generated data is amazing. Do you dare to imagine what it takes to store all your data? This is a complicated process! Therefore, before delving into the Spark aspect of this article, let’s understand what stream data is.

stream data has no discrete start or end. This data is generated from thousands of data sources per second and they need to be processed and analyzed as quickly as possible. Large amounts of streaming data need to be processed in real time, such as Google search results.

We know that some insights will be more valuable when an event first occurs, and they will gradually lose value over time. Take sports events as an example – we want to see real-time analysis, real-time statistical insights, and really enjoy the game in that moment, right?

For example, let's say you're watching an exciting tennis match between Roger Federer (Roger Federer) vs. Novak Djokovic.

This game tied in two innings, and you want to know the percentage of his backhand serve compared to Federer's professional average. Should I see it meaningful in a few days, or the moment before the decisive game begins?

Source: Pexels

Spark Streaming Basics

Spark Streaming is an extension of the core Spark API, which can realize scalable and fault-tolerant stream processing of real-time data streams.

Before you go to the implementation section, let’s first understand the different components of Spark Streaming.

Discrete Stream

Discrete Stream (Dstream) is a continuous data stream. For discrete streams, their data streams can be received directly from the data source or after some processing of the original data.

The first step in building a streaming application is to define the batch duration of the data resources to collect data from.If the batch duration is 2 seconds, data will be collected every 2 seconds and stored in the RDD. These continuous sequence chains of RDDs are a DStream, which is immutable and can be used as a distributed dataset through Spark.

consider a typical data science project. In the data preprocessing phase, we need to convert variables, including converting categorical variables into numeric variables, creating binning, removing outliers and many other things. Spark keeps history of all transformations defined on the data. Therefore, whenever a failure occurs, it can trace the converted path and regenerate the calculation results.

We want the Spark application to run continuously for 7 x 24 hours. And whenever a failure occurs, we hope it can recover as soon as possible. However, while processing data at a large scale, Spark needs to recalculate all transformations in case of failure. As you can imagine, the cost of doing so can be very expensive.

cache

This is a way to deal with this challenge. We can temporarily store calculated (cache) results to maintain the results of the transformation defined on the data. In this way, when a failure occurs, there is no need to recalculate these transformations again and again.

DStreams allows streaming data to be kept in memory. This is useful when we want to perform multiple operations on the same data.

checkpoint

cache is very useful when it is used normally, but it requires a lot of memory. Not everyone has hundreds of computers with 128 GB of memory to cache everything. The concept of

checkpoint can be helpful.

checkpoint is another technique to preserve the results of the converted dataframe. It will save the state of the running application from time to time on any reliable storage medium (such as HDFS). However, it is slower than cache and has less flexibility.

can use checkpoints when owning stream data. The conversion result depends on the previous conversion result and needs to be saved for use. In addition, we also store checkpoint metadata information, such as configurations used to create stream data and the results of a series of DStream operations.

Share variable for stream data

Sometimes functions such as map, reduce or filter are defined for Spark applications that must be executed on multiple clusters. The variables used in the function are copied to each machine (cluster).

In this case, each cluster has a different executor and we want something that can give the relationship between these variables.

For example: Suppose Spark applications run on 100 different clusters, they capture Instagram images posted by people from different countries.

Now, each cluster's executor will calculate the results of the data on that particular cluster. But we need something to help these clusters communicate in order to get aggregated results. In Spark, we have shared variables that allow this problem to be overcome.

accumulator variable

use cases include the number of errors occurred, the number of blank logs, the number of requests we receive from a specific country - all of which can be resolved using an accumulator.

Executors on each cluster send data back to the driver process to update the value of the accumulator variable. The accumulator is only suitable for associative and interchangeable operations. For example, it is useful for summing and maximizing, but average value does not work.

Broadcast variable

When we use location data (such as mappings of city names and postal codes), these are fixed variables, right? Now, if each specific transformation on any cluster requires this type of data, we don't need to send a request to the driver, as it would be too expensive.

html

Instead, a copy of this data can be stored on each cluster.These types of variables are called broadcast variables.

broadcast variable allows programmers to keep a read-only variable on each computer. Typically, Spark automatically assigns broadcast variables using efficient broadcast algorithms, but if there are tasks that require the same data for multiple stages, they can be defined as well.

Use PySpark for sentiment analysis of streaming data

It's time to start your favorite IDE! Let's encode in this section and understand streaming data in a practical way.

Understanding Problem Statement

In this section we will use the real dataset. Our goal is to detect hate speech in tweets. For simplicity, if a tweet contains remarks with racist or sexist sentiment, we think that tweet contains hate speech.

So the task is to distinguish racist or sexist tweets from other tweets. We will use a training sample containing tweets and tags, where tag "1" means tweets are racist/sexist, and tag "0" means other types.

Source: TechCrunch

Why is this a topic-related project? Because social media platforms receive huge streaming data in the form of comments and status updates. This project will help us review publicly published content.

Set the project workflow

    . Model construction: build a logistic regression model pipeline to classify whether the tweet contains hate speech. Here, our focus is not to build a completely accurate classification model, but to understand how to use arbitrary models on streaming data and return the result

  • . Initialize Spark Streaming's environment: Once the model is built, you need to define the host name and port number to get the streaming data

    . Streaming data: Next, add a tweet from the netcat server from the defined port, and the SparkStrreaming API will receive the data after the specified duration

    . Predict and return the result: Once a tweet is received, the data is passed into the created machine learning pipeline and the predicted emotions are returned from the model

    This is a concise description of the workflow:

    Training data to build a logistic regression model

    We store the tweet data and its corresponding tags in a csv file. Logistic regression model is used to predict whether tweets contain hate speech. If yes, the model predicts the label to be 1 (otherwise it is 0). You can refer to "PySpark for Beginners" to set up your Spark environment.

    can download the dataset and code here.

    First, you need to define the schema of the CSV file. Otherwise, Spark will treat each column of data as a string.Read data and check whether the pattern meets the definition:

    1. # importing required librarys
    2. from pyspark import SparkContexttml15
    3. from pyspark.sql.session import SparkSession
    4. from pyspark.streaming import StreamingContexttml15
    5. from pyspark.streaming import StreamingContexttml15
    6. import pyspark.sql.types as tp
    7. from pyspark.ml import Pipeline
    8. from pyspark.ml.feature import StringIndexer, OneHotEncoderEstimator, VectorAssembler
    9. from pyspark.ml.feature import StopWordsRemover, Word2Vec, RegexTokenizer
    10. from pyspark.ml.classification import LogisticRegression
    11. from pyspark.sql import Row
    12. # initializing spark session
    13. sc = SparkContext(appName="PySparkShell")
    14. spark = SparkSession(sc)
    15. # define the schema
    16. my_schema = tp.StructType([
    17. tp.StructField(name='id', dataType= tp.IntegerType(), nullable=True),
    18. tp.StructField(name='label', dataType= tp.IntegerType(), nullable=True),
    19. tp.StructField(name='tweet', dataType= tp.StringType(), nullable=True)
    20. ])
    21. # read the datasettml15
    22. my_data = spark.read.csv('twitter_sentiments.csv',
    23. schema=my_schema,
    24. header=True)
    25. # view the data
    26. my_data.show(5)
    27. # print the schema of the file
    28. my_data.printSchema()

    defines the various stages of the machine learning pipeline

    Now the data has been saved in the Spark data frame, and different stages of the conversion data need to be defined, and then use it to get the predicted tags from the model.

    In the first stage, use RegexTokenizer to convert the Twitter text into a word list. Then, remove the stop words from the word list and create a word vector. In the final stage, these word vectors are used to construct logistic regression models and obtain predicted sentiments.

    Remember - the point is not to build a completely accurate classification model, but to see how to use prediction models on streaming data to get results.

    1. # define stage 1: tokenize the tweet text
    2. stage_1 = RegexTokenizer(inputCol='tweet', outputCol='tokens', pattern='\\W')
    3. # define stage 2: remove the stop words
    4. stage_2 = StopWordsRemover(inputCol='tokens', outputCol='filtered_words')
    5. # define stage 3: create a word vector of the size 100
    6. stage_3 = Word2Vec(inputCol='filtered_words', outputCol='vector', vectorSize=100)
    7. # define stage 4: Logistic Regression Model
    8. model = LogisticRegression(featuresCol='vector', labelCol='label')

    Set up machine learning pipeline

    Let's add stages in the Pipeline object and then perform these transformations in order.Fit the pipeline with the training dataset. Now, whenever there is a new tweet, you only need to pass it to the pipeline object and convert the data to get the prediction:

    1. # setup the pipeline
    2. pipeline = Pipeline(stages= [stage_1, stage_2, stage_3, model])
    3. # fit the pipeline model with the training data
    4. pipelineFit = pipeline.fit(my_data)

    streaming data and return results

    Assuming hundreds of comments are received per second, we want to keep the platform clean by preventing users from posting hate speech. So whenever we receive a new text, we pass it into the pipeline and get the predicted sentiment.

    We will define a function get_prediction that will delete the blank sentence and create a data frame where each line contains a tweet.

    Initializes the Spark Streaming environment and defines a batch duration of 3 seconds. This means we will predict the data we receive every 3 seconds:

    1. # define a function to compute sentiments of the received tweets
    2. defget_prediction(tweet_text):
    3. try:
    4. # filter the tweets whose length is greater than 0
    5. tweet_text = tweet_text.filter(lambda x: len(x) 0)
    6. # create a dataframe with column name 'tweet' and each row will contain the tweet
    7. rowRdd = tweet_text.map(lambda w: Row(tweet=w))
    8. # create a spark dataframe
    9. wordsDataFrame = spark.createDataFrame(rowRdd)
    10. # transform the data using the pipeline and get the predicted sentiment
    11. pipelineFit.transform(wordsDataFrame).select('tweet','prediction').show()
    12. except:
    13. print('No data')
    14. # initialize the streaming contexttml15
    15. ssc = StreamingContext(sc, batchDuration=3)
    16. # Create a DStream that will connect to hostname:port, like localhost:9991
    17. lines = ssc.socketTextStream(sys.argv[1], int(sys.argv[2]))
    18. # split the tweet text by a keyword 'TWEET_APP' so that we can identify which set of words is from a single tweet
    19. words = lines.flatMap(lambda line : line.split('TWEET_APP'))
    20. # get the predicted sentiments for the tweets received
    21. words.foreachRDD(get_prediction)
    22. # Start the computing
    23. ssc.start()
    24. # Wait for the computing to terminate
    25. ssc.awaitTermination()

    Run the program on a terminal and use Netcat (a utility for sending data to a defined host name and port number). You can start the TCP connection with the following command:

    1. nc -lk port_number

    Finally, type the text in the second terminal and you will get the prediction in real time in another terminal.

    perfect!

    Conclusion

    Streaming data will only become more and more popular in the next few years, so you should really start to get familiar with this topic. Remember, data science is not just about building models—the entire process needs attention.

    This article introduces the basics of SparkStreaming and how to implement it on a real dataset. I encourage you to use another dataset or grab real-time data to implement what you just introduced (you can try other models as well).

    Looking forward to listening to your feedback and ideas on this article in the comment section below.

    Leave a message Like Follow

    Let's share the information on AI learning and development

    If reprinted, please leave a message in the background and abide by the reprinting specifications