Tweets in the Stream
Learning Apache Spark Streaming in Java with Apache Bahir Twitter API.
Introduction
This is the second of three articles sharing my experience learning Apache Spark.
In this Tutorial I will help you successfully tap aTwitter feed using an API.
- Test a simple Apache Spark Streaming application from the Apache Spark Examples.
- Obtain Twitter credentials and access a stream of Tweets using the Behir Spark Twitter API.
Audience
This tutorial is for Java programmers learning Apache Spark and Big Data, and wanting to go through the Twitter Spark Streaming API. This is accessible to all levels, and no Spark or Hadoop experience necessary. There are many Scala tutorials for Spark; this one is for Java.
Read my earlier article for more on beginning with Apache Spark for Java
https://medium.com/telegraph-hill-software/starting-the-spark-4ccc7e82ca93
Outline
The topic of the tutorial is Spark Streaming, specifically building and running a Java Spark application. I use the example application from the Apache Spark project, which I go over successfully building and running. After testing the base application, I add in code to get the Spark Twitter feed using the Bahir Spark Streaming extension. Here’s an example of the Twitter Feed I will help you output to the console.
RT @jajarlalah: จริงนะ เด็กๆวงอื่นก็เท่อ่ะ แต่ความซอนเบนิม7ปี มันมีคาริสม่าบางอย่างที่mature เปล่งออกมา 5คนเต็มเวที #MGMAVOTE #NUEST https…
RT @BlxxxT: แกเหนือสิ่งอื่นใดจริงๆ เจโน่หล่อ😂😂😂 #nomin https://t.co/VIOlb4Np53
FIG 1: Tweets output to the console
- Get Twitter Credentials
- Build basic Maven app
- Add TweetStream Class and Spark and Twitter libraries
- Modify code: Add Twitter credentials and terminate resources
- Stream the Tweets
Run the Apache Spark Streaming JavaNetworkWordCount example:
Use the Bahir Spark Streaming Twitter Library extension for an easy interface to the Twitter feed.
https://bahir.apache.org/docs/spark/current/spark-streaming-twitter/
1. Get Twitter Credentials
- Setup a regular Twitter account at twitter.com.
- Apply for a Twitter developer account at the URL below
- Apply to make a Twitter App, and answer questions about your intended use.
- I was instantly granted access as an individual testing the api and not rebroadcasting.
- When granted access, obtain two pairs of twitter credentials, the consumer and the access token.
Twitter Developer URL to apply for access:
https://developer.twitter.com/en/apply-for-access
2. Build Maven HelloWorld App
- In your IDE, Choose new Maven project with Java 1.8, Create from maven-archetype-quickstart
- In your IDE, Run maven build to load libraries. Build Success!
- In your IDE, Run App.java to test HelloWorld
- If failure, in IDE check project settings for Maven, SDK and bytecode are Java 1.8.
3. Build Streaming Word Count Application
- Get the Spark Example streaming code
* This example, like the other Spark streaming examples, consumes text strings sent from a NetCat Server. These commands are for windows version Nmap Netcat.
2. Edit your POM file: Update Maven Compiler to Java 1.8 or *greater.
* Versions are important. I am using the latest Spark version 2.4.0 and Scala 2.11 for Spark SQL and Spark Core compatibility plus Spark Streaming. .
<properties>
<maven.compiler.source>1.8</maven.compiler.source> <maven.compiler.target>1.8</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.4.0</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.4.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId><version>2.4.0</version>
</dependency>
<dependencies>
FIG 2: Maven POM configuration file dependencies
URL: Maven Spark Repo
https://mvnrepository.com/artifact/org.apache.spark
3. In your IDE with Maven, import the new libraries and Build.
4. Create new Java Class called JavaNetworkWordCount, and copy the Spark Example code into your new empty Java class.
5. Modify the Java code
Comment out the following code.
if (args.length < 2) {
System.err.println(“Usage: JavaNetworkWordCount <hostname> <port>”);
System.exit(1);
}
Configure your Spark Master
SparkConf sparkConf = new SparkConf().setAppName(“JavaTwitterFeed”).setMaster(“local[2]”);
Use hostname and port variables
String host = “127.0.0.1”;
String port = “9999”;
JavaReceiverInputDStream<String> lines = ssc.socketTextStream(host, Integer.parseInt(port), StorageLevels.MEMORY_AND_DISK_SER);
Omit and comment out this code to compile. Or Delete.
StreamingExamples.setStreamingLogLevels();
FIG 3: Modifications to JavaNetworkWordCount java file for testing purposes
6. In Maven run a Build to compile.
7. Run Netcat Server and test with Netcat client. Type text strings through. Close The client. Restart the Server. Send a few multiples of different words through to test word counting.
start the server in a command console
ncat -l 127.0.0.1 9999
blue blue green green green red red red
Start the client in another command console
ncat 127.0.0.1 9999
FIG 4: Netcat server configuration and messaging
8. Once the Server is running, In the IDE, Run the Java Spark Application JavaNetworkWordCount. Spark should initialize and then process the input stream. Restart the message server and the Spark application if it doesn’t connect. Watch the server console to see your word count, and stop it when you see them go by to confirm.
From Spark Log
Finished job streaming job 1562552643000 ms.0 from job set of time 1562552643000 ms
— — — — — — — — — — — — — — — — — — — — — -
Time: 1562552643000 ms
— — — — — — — — — — — — — — — — — — — — — -
(blue,2)
(green,3)
(red,3)
19/07/07 19:24:04 INFO JobScheduler: Starting job streaming job 1562552644000 ms.0 from job set of time 1562552644000 ms
FIG 5: Spark console output in your IDE when you run Netcat and JavaNetworkWordCount
4. Add Twitter Streaming framework and Code
You have proven your installation of Apache Spark Streaming is operational. Starting out with a fresh Java application class. Copy/clone the JavaNetworkWordCount java class you have been working with, and save it as TwitterFeed.java. I use TwitterUtils from Bahir.
In your IDE open your Maven configuration file (POM) and add the maven dependency for Apache Bahir Project’s Spark Twitter library version 2.3.3/ Scala v 2.11
URL for maven dependency
https://mvnrepository.com/artifact/org.apache.bahir/spark-streaming-twitter
<dependency>
<groupId>org.apache.bahir</groupId>
<artifactId>spark-streaming-twitter_2.11</artifactId><version>2.3.3</version>
</dependency>
FIG 6: Bahir Maven dependency
“TwitterUtils uses Twitter4j to get the public stream of tweets using Twitter’s Streaming API. Authentication information can be provided by any of the methods supported by Twitter4J library. You can import the TwitterUtils class and create a DStream with TwitterUtils.createStream as shown below.”
When Twitter alerts you have your application credentials, obtain both sets of keys. In your IDE, open your copy of the JavaNetworkWordCount. Add the authentication credentials ( keys) to the newly cloned TwitterFeed class. You must define the parameter variables yourself, or hard-code the fields with your keys.
System.setProperty(“twitter4j.oauth.consumerKey”, consumerKey);
System.setProperty(“twitter4j.oauth.consumerSecret”, consumerSecret);
System.setProperty(“twitter4j.oauth.accessToken”, accessToken);
System.setProperty(“twitter4j.oauth.accessTokenSecret”, accessTokenSecret);
FIG 7: Twitter API credentials
Coding
- Set the duration of the JavaStreamingContext.
- Initialize the JavaReceiverInputDStream object with by passing
- JavaStreamingContext to the TwitterUtils.createStream method.
- Remove the old usage of socketTextStream.
JavaStreamingContext javaStreamingContext = new JavaStreamingContext(sparkConf, new Duration(40000));
Remove this code
// JavaReceiverInputDStream<String> lines =javaStreamingContext.socketTextStream(
// host, Integer.parseInt(port), //StorageLevels.MEMORY_AND_DISK_SER);
Add this Code
JavaReceiverInputDStream<Status> javaReceiverInputDStream = TwitterUtils.createStream(javaStreamingContext);
FIG 8: Add Twitter code
The lambda streaming function outputs tweets line by line. These tweets are from the Stream.
JavaDStream<String> tweets = twitterStream.map(new Function<Status, String>() {
@Override
public String call(Status s) {
return s.getText();}
});
tweets.print();
FIG 9: Function to map stream to JavaDStream objects, containers for a sort of row in the stream.
Finally, when you’re done with your code, strangely you start the SparkStreamingContext. Then terminate the java spark context
javaStreamingContext.start();
try {
jssc.awaitTermination();
} catch (InterruptedException e) {
e.printStackTrace();
}
FIG 9: Terminate Java Spark Context
How it works under the covers between the Application and Twitter
FIG 10: Twitter’s own Streaming API under the covers.
Spark interfaces with Bahir, which uses Twitter4J to go to Twitter. (From Twitter)
https://developer.twitter.com/en/docs/tutorials/consuming-streaming-data.html
5. Stream the Tweets
Now build the maven project and run the TwitterFeed class. Spark console should run with continuous processing and output:
19/07/06 20:44:30 INFO TaskSetManager: Finished task 0.0 in stage 1.0 (TID 1) in 73 ms on localhost (executor driver) (1/1)
19/07/06 20:44:30 INFO TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have all completed, from pool
19/07/06 20:44:30 INFO DAGScheduler: ResultStage 1 (print at TweetStream.java:56) finished in 0.094 s
19/07/06 20:44:30 INFO DAGScheduler: Job 1 finished: print at TweetStream.java:56, took 0.115272 s
Time: 1562469690000 ms
— — — — — — — — — — — — — — — — — — — — —
FIG 11: Terminate Java Spark Context
RT @Kimsaa97: เพื่อนที่ซับพอร์ตกันแบบนี้ควรรักษาไว้นะคะ ❤️ ทีมหน้าจอไลฟ์และเน๊อะ #GOT7inLA
RT @C3eOnHti5HZJ4A2: @jintei01 いや自分達も乗ってたんです
RT @abura_goma: ドラク工主入公×力ミュカプオンリーお願いします!!!!!!!!
RT @havoc129: มีคนเคยมาคอมมิชภาพสีน้ำที่เคยทำแนวนี้เป็นฉาก + ตลคอีก2ตัว จะเอาไปเป็นปก พอประเมินราคาไป ก็โด
「進化したナノイー&ダブルミネラルで摩擦ダメージや紫外線に強い髪へ」…
RT @BlxxxT: แกเหนือสิ่งอื่นใดจริงๆ เจโน่หล่อ😂😂😂 #nomin https://t.co/VIOlb4Np53
FIG 12: Tweets Displaying in Spark Console
References
Links from article for reference and further reading
- https://developer.twitter.com/en/apply-for-access
- https://github.com/apache/spark/blob/master/examples/src/main/java/org/apache/spark/examples/streaming/JavaNetworkWordCount.java
- https://bahir.apache.org/docs/spark/current/spark-streaming-twitter/
- http://twitter4j.org/en/index.html
- https://developer.twitter.com/en/docs/tutorials/consuming-streaming-data
- https://telegraphhillsoftware.com/starting-the-spark/