Monday 23 May 2016

Exercise 1: Ingest and query relational data

Exercise 1: Ingest and query relational data
In this scenario, DataCo's business question is: What products do our customers like to buy? To answer this question, the first thought might be to look at the transaction data, which should indicate what customers actually do buy and like to buy, right?
This is probably something you can do in your regular RDBMS environment, but a benefit of Apache Hadoop is that you can do it at greater scale at lower cost, on the same system that you may also use for many other types of analysis.
What this exercise demonstrates is how to do exactly the same thing you already know how to do, but in CDH. Seamless integration is important when evaluating any new infrastructure. Hence, it's important to be able to do what you normally do, and not break any regular BI reports or workloads over the dataset you plan to migrate.

To analyze the transaction data in the new platform, we need to ingest it into the Hadoop Distributed File System (HDFS). We need to find a tool that easily transfers structured data from a RDBMS to HDFS, while preserving structure. That enables us to query the data, but not interfere with or break any regular workload on it.
Apache Sqoop, which is part of CDH, is that tool. The nice thing about Sqoop is that we can automatically load our relational data from MySQL into HDFS, while preserving the structure. With a few additional configuration parameters, we can take this one step further and load this relational data directly into a form ready to be queried by Apache Impala (incubating), the MPP analytic database included with CDH, and other workloads.
You should first log in to the Master Node of your cluster via a terminal. Then, launch the Sqoop job:
> sqoop import-all-tables \
    -m {{cluster_data.worker_node_hostname.length}} \
    --connect jdbc:mysql://{{cluster_data.manager_node_hostname}}:3306/retail_db \
    --username=retail_dba \
    --password=cloudera \
    --compression-codec=snappy \
    --as-parquetfile \
    --warehouse-dir=/user/hive/warehouse \
This command may take a while to complete, but it is doing a lot. It is launching MapReduce jobs to pull the data from our MySQL database and write the data to HDFS in parallel, distributed across the cluster in Apache Parquet format. It is also creating tables to represent the HDFS files in Impala/Apache Hive with matching schema.
Parquet is a format designed for analytical applications on Hadoop. Instead of grouping your data into rows like typical data formats, it groups your data into columns. This is ideal for many analytical queries where instead of retrieving data from specific records, you're analyzing relationships between specific variables across many records. Parquet is designed to optimize data storage and retrieval in these scenarios.


When this command is complete, confirm that your data files exist in HDFS.
> hadoop fs -ls /user/hive/warehouse/
> hadoop fs -ls /user/hive/warehouse/categories/
These commands will show the directories and the files inside them that make up your tables:

Exercise 2: Correlate structured data with unstructured data

Exercise 2: Correlate structured data with unstructured data
Since you are a pretty smart data person, you realize another interesting business question would be: are the most viewed products also the most sold? Since Hadoop can store unstructured and semi-structured data alongside structured data without remodeling an entire database, you can just as well ingest, store, and process web log events. Let's find out what site visitors have actually viewed the most.
For this, you need the web clickstream data. The most common way to ingest web clickstream is to use Apache Flume. Flume is a scalable real-time ingest framework that allows you to route, filter, aggregate, and do "mini-operations" on data on its way in to the scalable processing platform.
In Exercise 4, later in this tutorial, you can explore a Flume configuration example, to use for real-time ingest and transformation of our sample web clickstream data. However, for the sake of tutorial-time, in this step, we will not have the patience to wait for three days of data to be ingested. Instead, we prepared a web clickstream data set (just pretend you fast forwarded three days) that you can bulk upload into HDFS directly.
Bulk Upload Data
For your convenience, we have pre-loaded some sample access log data into /opt/examples/log_data/access.log.2.
Let's move this data from the local filesystem, into HDFS.
> sudo -u hdfs hadoop fs -mkdir /user/hive/warehouse/original_access_logs
> sudo -u hdfs hadoop fs -copyFromLocal /opt/examples/log_files/access.log.2 /user/hive/warehouse/original_access_logs
The copy command may take several minutes to complete.
Verify that your data is in HDFS by executing the following command:
> hadoop fs -ls /user/hive/warehouse/original_access_logs
You should see a result similar to the following:

Now you can build a table in Hive and query the data via Apache Impala (incubating) and Hue. You'll build this table in 2 steps. First, you'll take advantage of Hive's flexible SerDes (serializers / deserializers) to parse the logs into individual fields using a regular expression. Second, you'll transfer the data from this intermediate table to one that does not require any special SerDe. Once the data is in this table, you can query it much faster and more interactively using Impala.
We'll use the Hive Query Editor app in Hue to execute the following queries:
CREATE EXTERNAL TABLE intermediate_access_logs (
    ip STRING,
    date STRING,
    method STRING,
    url STRING,
    http_version STRING,
    code1 STRING,
    code2 STRING,
    dash STRING,
    user_agent STRING)
ROW FORMAT SERDE 'org.apache.hadoop.hive.contrib.serde2.RegexSerDe'
    'input.regex' = '([^ ]*) - - \\[([^\\]]*)\\] "([^\ ]*) ([^\ ]*) ([^\ ]*)" (\\d*) (\\d*) "([^"]*)" "([^"]*)"',
    'output.format.string' = "%1$$s %2$$s %3$$s %4$$s %5$$s %6$$s %7$$s %8$$s %9$$s")
LOCATION '/user/hive/warehouse/original_access_logs';

CREATE EXTERNAL TABLE tokenized_access_logs (
    ip STRING,
    date STRING,
    method STRING,
    url STRING,
    http_version STRING,
    code1 STRING,
    code2 STRING,
    dash STRING,
    user_agent STRING)
LOCATION '/user/hive/warehouse/tokenized_access_logs';

ADD JAR {{lib_dir}}/hive/lib/hive-contrib.jar;

INSERT OVERWRITE TABLE tokenized_access_logs SELECT * FROM intermediate_access_logs;
The final query will take a minute to run. It is using a MapReduce job, just like our Sqoop import did, to transfer the data from one table to the other in parallel. You can follow the progress in the log below, and you should see the message 'The operation has no results.' when it's done.
Again, we need to tell Impala that some tables have been created through a different tool. Switch back to the Impala Query Editor app, and enter the following command:
invalidate metadata;
Now, if you enter the 'show tables;' query or refresh the table list in the left-hand column, you should see the two new external tables in the default database. Paste the following query into the Query Editor:
select count(*),url from tokenized_access_logs
where url like '%\/product\/%'
group by url order by count(*) desc;
You should see a result similar to the following:

By introspecting the results you quickly realize that this list contains many of the products on the most sold list from previous tutorial steps, but there is one product that did not show up in the previous result. There is one product that seems to be viewed a lot, but never purchased. Why?

Well, in our example with DataCo, once these odd findings are presented to your manager, it is immediately escalated. Eventually, someone figures out that on that view page, where most visitors stopped, the sales path of the product had a typo in the price for the item. Once the typo was fixed, and a correct price was displayed, the sales for that SKU started to rapidly increase.
If you had lacked an efficient and interactive tool enabling analytics on high-volume semi-structured data, this loss of revenue would have been missed for a long time. There is risk of loss if an organization looks for answers within partial data. Correlating two data sets for the same business question showed value, and being able to do so within the same platform made life easier for you and for the organization.
If you'd like to dive deeper into Hive, Impala, and other tools for data analysis in Cloudera's platform, you may be interested in Data Analyst Training.
For now, we'll explore some different techniques.

Exercise 4: Explore log events interactively

Exercise 4: Explore log events interactively
Since sales are dropping and nobody knows why, you want to provide a way for people to interactively and flexibly explore data from the website. We can do this by indexing it for use in Apache Solr, where users can do text searches, drill down through different categories, etc. Data can be indexed by Solr in batch using MapReduce, or you can index tables in HBase and get real-time updates. To analyze data from the website, however, we're going to stream the log data in using Flume.
The web log data is a standard web server log which may look something like this:

Solr organizes data similarly to the way a SQL database does. Each record is called a 'document' and consists of fields defined by the schema: just like a row in a database table. Instead of a table, Solr calls it a 'collection' of documents. The difference is that data in Solr tends to be more loosely structured. Fields may be optional, and instead of always matching exact values, you can also enter text queries that partially match a field, just like you're searching for web pages. You'll also see Hue refer to 'shards' - and that's just the way Solr breaks collections up to spread them around the cluster so you can search all your data in parallel.
Here is how you can start real-time-indexing via Cloudera Search and Flume over the sample web server log data and use the Search UI in Hue to explore it:
Create Your Search Index
Ordinarily when you are deploying a new search schema, there are four steps:
  1. Creating an empty configuration
First, generate the configs by executing the following command:
> solrctl --zk {{zookeeper_connection_string}}/solr instancedir --generate solr_configs
The result of this command is a skeleton configuration that you can customize to your liking via the conf/schema.xml.
  1. Edit your schema
The most likely area in conf/schema.xml that you would be interested in is the <fields></fields> section. From this area you can define the fields that are present and searchable in your index.
  1. Uploading your configuration
4.      > cd /opt/examples/flume
5.      > solrctl --zk {{zookeeper_connection_string}}/solr instancedir --create live_logs ./solr_configs
You may need to replace the IP addresses with those of your three data nodes.
  1. Creating your collection
7.      solrctl --zk {{zookeeper_connection_string}}/solr collection --create live_logs -s {{ number of solr servers }}
You may need to replace the IP addresses with those of your three data nodes.
You can verify that you successfully created your collection in Solr by going to Hue, and clicking Search in the top menu

Then click on Indexes from the top right to see all of the indexes/collections.

Now you can see the collection that we just created, live_logs, click on it.

You are now viewing the fields that we defined in our schema.xml file.

Now that you have verified that your search collection/index was created successfully, we can start putting data into it using Flume and Morphlines. Flume is a tool for ingesting streams of data into your cluster from sources such as log files, network streams, and more. Morphlines is a Java library for doing ETL on-the-fly, and it's an excellent companion to Flume. It allows you to define a chain of tasks like reading records, parsing and formatting individual fields, and deciding where to send them, etc. We've defined a morphline that reads records from Flume, breaks them into the fields we want to search on, and loads them into Solr (You can read more about Morphlines here). This example Morphline is defined at /opt/examples/flume/conf/morphline.conf, and we're going to use it to index our records in real-time as they're created and ingested by Flume.
Flume and the Morphline
Now that we have an empty Solr index, and live log events coming in to our fake access.log, we can use Flume and morphlines to load the index with the real-time log data.
The key player in this tutorial is Flume. Flume is a system for collecting, aggregating, and moving large amounts of log data from many different sources to a centralized data source.
With a few simple configuration files, we can use Flume and a morphline (a simple way to accomplish on-the-fly ETL,) to load our data into our Solr index.
You can use Flume to load many other types of data stores; Solr is just the example we are using for this tutorial.
Start the Flume agent by executing the following command:
> flume-ng agent \
    --conf /opt/examples/flume/conf \
    --conf-file /opt/examples/flume/conf/flume.conf \
    --name agent1 \
This will start running the Flume agent in the foreground. Once it has started, and is processing records, you should see something like:

Now you can go back to the Hue UI, and click 'Search' from the collection's page:

You will be able to search, drill down into, and browse the events that have been indexed.

If one of these steps fails, please reach out to the Discussion Forum and get help. Otherwise, you can start exploring the log data and understand what is going on.
For our story's sake, we pretend that you started indexing data the same time as you started ingesting it (via Flume) to the platform, so that when your manager escalated the issue, you could immediately drill down into data from the last three days and explore what happened. For example, perhaps you noted a lot of DDOS events and could take the right measures to preempt the attack. Problem solved! Management is fantastically happy with your recent contributions, which of course leads to a great bonus or something similar. :D
Now you have learned how to use Cloudera Search to allow exploration of data in real time, using Flume and Solr and Morphlines. Further, you now understand how you can serve multiple use cases over the same data - as well as from previous steps: serve multiple data sets to provide bigger insights. The flexibility and multi-workload capability of a Hadoop-based Enterprise Data Hub are some of the core elements that have made Hadoop valuable to organizations world wide.

Exercise 5: Building a Dashboard

Exercise 5: Building a Dashboard

To get started with building a dashboard with Hue, click on the pen icon.
This will take you into the edit-mode where you can choose different widgets and layouts that you would like to see. You can choose a number of options and configurations here, but for now, just drag a barchart into the top gray row.
This will bring up the list of fields that are present in our index so that you can choose which field you would like to group by. Let's chooserequest_date.
For the sake of our display, choose +15MINUTES for the INTERVAL.
You aren't limited to a single column, you can view this as a two-column display as well. Select the two-column layout from the top left.
While you're here, let's drag a pie chart to the newly created row in the left column.
This time, let's choose department as the field that we want to group by for our pie chart.
Things are really starting to take shape! Let's add a Facet filter to the left hand side, and select product as the facet.
Now that we are satisfied with our changes, let's click on the pencil icon to exit edit mode.
And save our dashboard.
At the Hue project's blog you can find a wide selection of video tutorials for accomplishing other tasks in Hue. For instance, you can watch a video of a similar Search dashboard to this example being created here.
You may also be interested in more advanced training on the technologies used in this tutorial, and other ways to index data in real-time and query it with Solr in Cloudera's Search Training course.