Spry was recently given the opportunity to be a guest author for the Hortonworks blog. The post is available in its entirety here. A sneak peek of the blog is given below!

In early 2014, Spry developed a solution that heavily utilized Hive for data transformations. When the project was complete, three distinct data sources were integrated through a series of HiveQL queries using Hive 0.11 on HDP 2.0. While the project was ultimately successful, the workflow itself took an astounding two full days to execute, with one query taking 11 hours.

Intuitively, these lengthy runtimes for queries heavily impact each stage of the development process – imagine troubleshooting why the query that takes eleven hours returns an empty set! After the project was delivered, the whole team started to question whether or not Hive was a viable option for these types of analytic development projects. The wide variety of commercial tools available in the Hadoop-compatible SQL market prompted the need to perform an analysis of alternatives.

Click to read more!

 

Read more

What is Kafka?

In terms of the distributed environment...

Apache Kafka is a distributive commit log service. It leverages a language independent TCP protocol to provide functionality as a messaging system over partitioned and replicated feeds called "topics". The partitioned logs are the object of distribution, as each active node constitutes a Kafka server and remains responsible for processing data and requests for a section of the partitions.

This post will provide an overview of these concepts and give you more insight into how Kafka functions.

Read more

In many of our use cases, the data we work with does not come ready to be fed into an analytics workflow. It must first be ingested and prepared. This includes renaming and/or reordering fields, changing data types, filtering out invalid values, and combining different parts of the same data source. In this post, we will be covering how to perform these steps using a Data Pipeline tool called Alteryx. We will walk through a workflow used for one of our clients.

Read more

A SELECT or COUNT query in Hive will be executed as a MapReduce job even if the queries are made against a small table or dataset.  Imagine that you want to execute one of these queries which should only take a few seconds... for example, the situation where the set up and tear down of the Hadoop job probably takes longer than the actual work portion of the job.  Also imagine that another user has a complex and long-running job already executing on the cluster.  Bad news for your job.  If you're using mostly Hadoop default settings for the YARN scheduling algorithm, it's possible that your simple job won't be executed until the other is finished.

A few months ago, one of our projects was in this exact situation.  What was worse was that we had multiple users needing to execute relatively simple and short running jobs in order to meet mini-milestones on their part of the project.  We had to find a solution.

Read more

When building a dashboard in Tableau, the analyst may want to filter many worksheets with a single filter selection. Tableau has three ways of making this possible:global quick filtersfilter actions, and parameters.

Global quick filters will filter each worksheet on a dashboard as long as they each contain that dimension. Filter actions use a "control" worksheet to filter other worksheets on the dashboard based on the selected elements in that sheet. Parameters allow the user to use what would otherwise be a quick filter as a filter if the dimension is different on each sheet.

In this post, we will explore the advantages and disadvantages of using each filter type and how to approach different use cases that call for dashboard filters.

Global Quick Filters

Advantages

Global quick filters are very useful when creating dashboards that contain worksheets that all use the same data source. For example, in a dashboard that displays the dataset in both text and visual forms, global quick filters give the flexibility to present the filter in a variety of formats: single value dropdown, multiple values list, wildcard match, etc. They also allow the user to show an aggregation of all marks with the "(All)" filter. 

Disadvantages

Of course, the main disadvantage of global quick filters is that if the analyst has a dashboard with worksheets that each use a different data source, they do not work. This is especially problematic when the component worksheets in a dashboard are displaying similar concepts but are built using datasets optimized for the specific type of worksheet the user needs (table, bar chart, map, etc.). In this case, even if the datasets all have the same columns and data types, the analyst is forced to find an alternative solution. 

Filter Actions

Read more

We recently ran into a weird problem whereby Oozie jobs would not progress past the PREP mode. Running the latest Hortonworks Data Platform v2.0.6.0release. Turns out that the port number we were using for the jobtracker was not correct. The correct jobtracker port turned out to be 8050. We were trying to use port 8021. Here is the job.properties file that ended up working:

oozie.wf.application.path=hdfs://node1:8020/mnt/WordCount
jobTracker=node2:8050
nameNode=hdfs://node1:8020

where node1 is the NameNode and node2 hosts the Oozie server.

Read more

Unit Testing Hive UDFs

As discussed in previous posts, a User Defined Table Function (UDTF) is a variant on the normal Hive UDF. Instead of reading one or more columns as input and writing a single column as output, the UDTF takes in one or more columns and writes multiple rows.

For UDFs, input records are passed to the following function for processing, with the result being used as the return value:
 
public static evaluate();
 
This fits the normal JUnit testing framework, so traditional testing methods can be applied.
 
However, for UDTFs, the input records are passed to the following function:
 
public void process(Object[] record);
 
Notice that the return value is "void". In the case of UDTFs, output values are written through calls to the "forward" method:
 
protected final void forward(java.lang.Object o);
 
Since both the process and forward methods have a void return value, this does not conform to the JUnit testing process, and an alternative approach is required.
 

AspectJ

AspectJ is an extension to the Java language that allows programmers to define "Aspects" - structures that provide mechanisms for overriding functionality in particular methods, or for supplementing additional functionality before or after a particular event. Events can be method calls, modifications of variables, initialization of classes, or thrown exceptions.

This technology is applicable to the UDTF case because it will allow us to apply AspectJ "advice" around the forward method - calling the normal Hive method during normal execution and calling a custom method that will fit into the JUnit framework during the testing phase.

Read more

Background
As mentioned in the previous post, Lingual 1.0 was release end of 2013 with Cascading 2.2. It generates Cascading code for SQL queries. Lingual flows can be integrated with a cascading cascade of other cascading and lingual flows.   There are some caveats in doing this since Lingual is a 1.0 release and also some additional tasks that you might not think about for cascading.  For our project, the developer could choose on a per module basis to implement the workflow in Lingual.  In some cases, for moderately complex but simpler queries, the processing was first attempted using Lingual.  Cascading was used as a fall back if the Lingual query's query plan could not be generated.  We found that we had to make a few adjustments and discovered some limitations in Lingual's first release.  Teams will have to decide if benefits of using Lingual outweigh the additional caveats for Schemas and early releases.  Lingual is a promising step in the evolution of tools for data processing on Hadoop.
Schemas
Since it is SQL processing, Lingual needs to know the types (java classes) for all of fields the taps input and output.   This is more type restrictive than Cascading.  In your cascading application, if you don't specify the field names it will parse the header as such using Fields.ALL.
 
field1, field2, field3
 
So in cascading you can get away reading the field names form the header like this since you don't need type information.
 
Tap tap = new Hfs( new TextDelimited( Fields.ALL, true, DataFile.DELIMITER  ), tapPath, sinkMode );
 
One way to specify tap information automatically to Lingual is to put them in the header separated by a colon.
 
field1:int, field2:int, field3:String
 
Then you can utilize their SQLTypeTextDelimited scheme you need to automatically parse the field names and types in the header.
 

Read more

If you are using a secured cluster, you may wish to segregate the data in HDFS so that different users are able to access different data. If you do this, you probably want to use this directory structure for all your services and tools.

For example, you will want to use only external tables in Hive and have the table locations be in specific directories on HDFS in your segregated directory structure. You will also want to lock down the default Hive warehouse location (/apps/hive/warehouse or /user/hive/warehouse) so that users won't be putting data into an insecure location that is accessible by all.

The most intuitive way to lock down the default warehouse is "hadoop fs -chmod 000". However, if you try to create an external table with the internal warehouse at 000 permissions, you will get an error similar to this:

Authorization failed:java.security.AccessControlException: action WRITE
not permitted on path hdfs://<hostname>:8020/apps/hive/warehouse for user
anastetsky. Use show grant to get more details.

Looks like Hive is still trying to write to the internal warehouse even when creating an external table!

This is a Hive bug. It seems as if Hive requires the warehouse to have "write" permission. If you unlock the warehouse and try again, it doesn't look like it actually writes anything to it, it just wants the directory to have the "write" permission.

There is a workaround.

You can set the warehouse to 222 instead of 000, which gives everyone "write" permission to it, but no "read" or "execute". Now, creating an external table works. But won't users now be able to create internal tables and not know their mistake until they try (and fail) to read them?

No, because creating an internal table and actually writing data to the internal warehouse also requires the "execute" permission, which the warehouse does not have (it's set to 222). You would get an error like the following:

FAILED: Execution Error, return code 1 from org.apache.hadoop.hive.ql.exec.DDLTask. MetaException(message:Got exception: org.apache.hadoop.security.AccessControlException Permission denied: user=anastetsky, access=EXECUTE, inode="/apps/hive/warehouse":hive:hdfs:d-w--w--w-

Read more

Introduction

The Hadoop framework keeps track of a set of built-in counters that it uses to track job progress. As an example, some of the ones displayed in the summary web page in the JobTracker are: the number of bytes read by the input format, the number of launched reduce tasks, the number of data local map tasks, and the number of bytes written by the output format. There is a mechanism that allows user-defined custom counters to be tracked alongside the built-in ones using the existing counter framework.
 

Creating Custom Counters

In order to create and use a custom counter, the first step is to create an Enum that will contain the names of all custom counters for a particular job. This can be achieved as follows:

enum CustomCounters {RED, BLUE}


Inside the map or reduce task, the counter can be adjusted using the following:

if(red)
    context.getCounter(CustomCounters.RED).increment(1); // increase the counter by 1
else if(blue)
    context.getCounter(CustomCounters.BLUE).increment(-1); // decrease the counter by 1

 

Programmatically Retrieving Custom Counter Values

The custom counter values will be displayed alongside the built-in counter values on the summary web page for a job viewed through the JobTracker. In addition to this, the values can be accessed programmatically with the following:
 
long redCounterValue = job.getCounters().findCounter(CustomCounters.RED);
 

Increasing the Counter Limit

The Hadoop framework does impose an upper bound on the number of counters that can be tracked per application. This is in place in order to ensure that no single job will accidentally use all available JobTracker memory during the course of its execution. While this may not be an issue for individual MapReduce jobs, some frameworks also make use of this method to keep track of their custom counters. Cascading is one such example, and the details of its Counter logic is described in a previous Spry blog post. Another example is the graph framework Giraph. In either case, if the number of Counters the framework attempts to track exceeds the limit on the number of counters (120 by default), the job will fail.

Fortunately, the property that limits the number of counters is configurable through mapred-site.xml.

In Hadoop 1.0: "mapreduce.job.counters.max".
In Hadoop 2.0: "mapreduce.job.counters.limit".
 

Where can I go for more information?

Giraph FAQ
Map/Reduce JIRA ticket to make counter limits configurable
External Blog - Limiting Usage Counters In Hadoop
Or feel free to leave your question or comment below!

Read more

Pages

About Spry

We are a high standards Data Science & Hadoop firm solving complex problems for Fortune 500 companies

Keep In Touch

  •   info (@) spryinc.com
  •   +443.212.5072
  •   53 Loveton Circle
    Suite 114
    Sparks, MD 21152