A SELECT or COUNT query in Hive will be executed as a MapReduce job even if the queries are made against a small table or dataset.  Imagine that you want to execute one of these queries which should only take a few seconds... for example, the situation where the set up and tear down of the Hadoop job probably takes longer than the actual work portion of the job.  Also imagine that another user has a complex and long-running job already executing on the cluster.  Bad news for your job.  If you're using mostly Hadoop default settings for the YARN scheduling algorithm, it's possible that your simple job won't be executed until the other is finished.

A few months ago, one of our projects was in this exact situation.  What was worse was that we had multiple users needing to execute relatively simple and short running jobs in order to meet mini-milestones on their part of the project.  We had to find a solution.

Read more

When building a dashboard in Tableau, the analyst may want to filter many worksheets with a single filter selection. Tableau has three ways of making this possible:global quick filtersfilter actions, and parameters.

Global quick filters will filter each worksheet on a dashboard as long as they each contain that dimension. Filter actions use a "control" worksheet to filter other worksheets on the dashboard based on the selected elements in that sheet. Parameters allow the user to use what would otherwise be a quick filter as a filter if the dimension is different on each sheet.

In this post, we will explore the advantages and disadvantages of using each filter type and how to approach different use cases that call for dashboard filters.

Global Quick Filters

Advantages

Global quick filters are very useful when creating dashboards that contain worksheets that all use the same data source. For example, in a dashboard that displays the dataset in both text and visual forms, global quick filters give the flexibility to present the filter in a variety of formats: single value dropdown, multiple values list, wildcard match, etc. They also allow the user to show an aggregation of all marks with the "(All)" filter. 

Disadvantages

Of course, the main disadvantage of global quick filters is that if the analyst has a dashboard with worksheets that each use a different data source, they do not work. This is especially problematic when the component worksheets in a dashboard are displaying similar concepts but are built using datasets optimized for the specific type of worksheet the user needs (table, bar chart, map, etc.). In this case, even if the datasets all have the same columns and data types, the analyst is forced to find an alternative solution. 

Filter Actions

Read more

We recently ran into a weird problem whereby Oozie jobs would not progress past the PREP mode. Running the latest Hortonworks Data Platform v2.0.6.0release. Turns out that the port number we were using for the jobtracker was not correct. The correct jobtracker port turned out to be 8050. We were trying to use port 8021. Here is the job.properties file that ended up working:

oozie.wf.application.path=hdfs://node1:8020/mnt/WordCount
jobTracker=node2:8050
nameNode=hdfs://node1:8020

where node1 is the NameNode and node2 hosts the Oozie server.

Read more

Unit Testing Hive UDFs

As discussed in previous posts, a User Defined Table Function (UDTF) is a variant on the normal Hive UDF. Instead of reading one or more columns as input and writing a single column as output, the UDTF takes in one or more columns and writes multiple rows.

For UDFs, input records are passed to the following function for processing, with the result being used as the return value:
 
public static evaluate();
 
This fits the normal JUnit testing framework, so traditional testing methods can be applied.
 
However, for UDTFs, the input records are passed to the following function:
 
public void process(Object[] record);
 
Notice that the return value is "void". In the case of UDTFs, output values are written through calls to the "forward" method:
 
protected final void forward(java.lang.Object o);
 
Since both the process and forward methods have a void return value, this does not conform to the JUnit testing process, and an alternative approach is required.
 

AspectJ

AspectJ is an extension to the Java language that allows programmers to define "Aspects" - structures that provide mechanisms for overriding functionality in particular methods, or for supplementing additional functionality before or after a particular event. Events can be method calls, modifications of variables, initialization of classes, or thrown exceptions.

This technology is applicable to the UDTF case because it will allow us to apply AspectJ "advice" around the forward method - calling the normal Hive method during normal execution and calling a custom method that will fit into the JUnit framework during the testing phase.

Read more

Background
As mentioned in the previous post, Lingual 1.0 was release end of 2013 with Cascading 2.2. It generates Cascading code for SQL queries. Lingual flows can be integrated with a cascading cascade of other cascading and lingual flows.   There are some caveats in doing this since Lingual is a 1.0 release and also some additional tasks that you might not think about for cascading.  For our project, the developer could choose on a per module basis to implement the workflow in Lingual.  In some cases, for moderately complex but simpler queries, the processing was first attempted using Lingual.  Cascading was used as a fall back if the Lingual query's query plan could not be generated.  We found that we had to make a few adjustments and discovered some limitations in Lingual's first release.  Teams will have to decide if benefits of using Lingual outweigh the additional caveats for Schemas and early releases.  Lingual is a promising step in the evolution of tools for data processing on Hadoop.
Schemas
Since it is SQL processing, Lingual needs to know the types (java classes) for all of fields the taps input and output.   This is more type restrictive than Cascading.  In your cascading application, if you don't specify the field names it will parse the header as such using Fields.ALL.
 
field1, field2, field3
 
So in cascading you can get away reading the field names form the header like this since you don't need type information.
 
Tap tap = new Hfs( new TextDelimited( Fields.ALL, true, DataFile.DELIMITER  ), tapPath, sinkMode );
 
One way to specify tap information automatically to Lingual is to put them in the header separated by a colon.
 
field1:int, field2:int, field3:String
 
Then you can utilize their SQLTypeTextDelimited scheme you need to automatically parse the field names and types in the header.
 

Read more

If you are using a secured cluster, you may wish to segregate the data in HDFS so that different users are able to access different data. If you do this, you probably want to use this directory structure for all your services and tools.

For example, you will want to use only external tables in Hive and have the table locations be in specific directories on HDFS in your segregated directory structure. You will also want to lock down the default Hive warehouse location (/apps/hive/warehouse or /user/hive/warehouse) so that users won't be putting data into an insecure location that is accessible by all.

The most intuitive way to lock down the default warehouse is "hadoop fs -chmod 000". However, if you try to create an external table with the internal warehouse at 000 permissions, you will get an error similar to this:

Authorization failed:java.security.AccessControlException: action WRITE
not permitted on path hdfs://<hostname>:8020/apps/hive/warehouse for user
anastetsky. Use show grant to get more details.

Looks like Hive is still trying to write to the internal warehouse even when creating an external table!

This is a Hive bug. It seems as if Hive requires the warehouse to have "write" permission. If you unlock the warehouse and try again, it doesn't look like it actually writes anything to it, it just wants the directory to have the "write" permission.

There is a workaround.

You can set the warehouse to 222 instead of 000, which gives everyone "write" permission to it, but no "read" or "execute". Now, creating an external table works. But won't users now be able to create internal tables and not know their mistake until they try (and fail) to read them?

No, because creating an internal table and actually writing data to the internal warehouse also requires the "execute" permission, which the warehouse does not have (it's set to 222). You would get an error like the following:

FAILED: Execution Error, return code 1 from org.apache.hadoop.hive.ql.exec.DDLTask. MetaException(message:Got exception: org.apache.hadoop.security.AccessControlException Permission denied: user=anastetsky, access=EXECUTE, inode="/apps/hive/warehouse":hive:hdfs:d-w--w--w-

Read more

Introduction

The Hadoop framework keeps track of a set of built-in counters that it uses to track job progress. As an example, some of the ones displayed in the summary web page in the JobTracker are: the number of bytes read by the input format, the number of launched reduce tasks, the number of data local map tasks, and the number of bytes written by the output format. There is a mechanism that allows user-defined custom counters to be tracked alongside the built-in ones using the existing counter framework.
 

Creating Custom Counters

In order to create and use a custom counter, the first step is to create an Enum that will contain the names of all custom counters for a particular job. This can be achieved as follows:

enum CustomCounters {RED, BLUE}


Inside the map or reduce task, the counter can be adjusted using the following:

if(red)
    context.getCounter(CustomCounters.RED).increment(1); // increase the counter by 1
else if(blue)
    context.getCounter(CustomCounters.BLUE).increment(-1); // decrease the counter by 1

 

Programmatically Retrieving Custom Counter Values

The custom counter values will be displayed alongside the built-in counter values on the summary web page for a job viewed through the JobTracker. In addition to this, the values can be accessed programmatically with the following:
 
long redCounterValue = job.getCounters().findCounter(CustomCounters.RED);
 

Increasing the Counter Limit

The Hadoop framework does impose an upper bound on the number of counters that can be tracked per application. This is in place in order to ensure that no single job will accidentally use all available JobTracker memory during the course of its execution. While this may not be an issue for individual MapReduce jobs, some frameworks also make use of this method to keep track of their custom counters. Cascading is one such example, and the details of its Counter logic is described in a previous Spry blog post. Another example is the graph framework Giraph. In either case, if the number of Counters the framework attempts to track exceeds the limit on the number of counters (120 by default), the job will fail.

Fortunately, the property that limits the number of counters is configurable through mapred-site.xml.

In Hadoop 1.0: "mapreduce.job.counters.max".
In Hadoop 2.0: "mapreduce.job.counters.limit".
 

Where can I go for more information?

Giraph FAQ
Map/Reduce JIRA ticket to make counter limits configurable
External Blog - Limiting Usage Counters In Hadoop
Or feel free to leave your question or comment below!

Read more

A member of the Spry team was selected to write a guest post for the Safari Books Online Blog. Please view the introduction below, but click through here to view the rest of the post.

This blog provides you with an introduction to Apache Giraph, covering details about graphs in general and the benefits of using Giraph.

Graph Vertices and Edges

One task that is extremely important for any analyst is the process of discovering and understanding connections. This process typically manifests in the form of graph processing. A graph can be defined as a collection of vertices and edges, where the vertices can have values and the edges can have directions and weights. Two of the most common ways of interacting with graphs include traversal (following edges between a set of connected vertices) and search (finding vertices in the graph that meet a set of conditions).
This graph structure of vertices and edges is extremely useful in a wide variety of real world applications. For example, imagine the problem of finding driving directions between two addresses. In this scenario, a graph of the roadway system can be built by considering roads as the edges, and intersections as the vertices. A related, larger problem over that same graph might be the process of optimizing the order in which a business makes its deliveries.
Even a natural system like the brain can be treated as a graph – where the billions of neurons are the vertices and the trillions of synapses connecting them are the edges. Once the brain is represented in this manner, research can be conducted to help us understand general brain function in addition to diseases and conditions that affect the passageways – like Alzheimer’s.

Graphs and MapReduce

In today’s analytic world, where data volume and velocity are growing faster than ever before, the benefits of using a parallel computing platform like Hadoop to process the information is clear. The appeal of Hadoop’s main building block – MapReduce – is that by transforming the input data into key/value pairs and splitting the pairs among the workers, the parts can be processed independently and then merged together to form the final result set. By design, there is no communication between tasks to ensure that no synchronization overhead affects task completion. Unfortunately, the traditional MapReduce style of execution does not lend itself to graph applications. A graph algorithm usually requires sending messages between vertices or performing “hops” to travel across an edge from one vertex to another as the bulk of the processing. Executing this in typical MapReduce fashion requires each hop or message to be processed in its own job.

Click here to view the rest of the post.

Read more

This is my first post on machine learning algorithms, so please forgive my briefness in some concepts/examples.
The Naive Bayes model (NBM) is probably the simplest out of all machine learning algorithms, involving Bayesian statistics. It is a great primer model, in the sense that it should be used as a first take on your data. Usually, the NBM is a favorite for text mining/classification and fits those kinds of applications really well. It makes over simplifying assumptions, but none-the-less, a great tool for gaining preliminary insights into the data.We first start off by defining conditional independence. Suppose we have three random variables A, B and C. These could be anything. For example, let them be the following:

A - will I have pizza for lunch today? (boolean variable, yes/no)

B - did I visit the cafeteria which has my favorite pizzeria (say, Tom's Delicious Pizzas)? (boolean variable, yes/no)

C - what's the name of the place on my receipt where I picked up lunch from today? (multivalued  variable; say I usually visit 8 places in the cafeteria for lunch, out of which Tom's Delicious Pizza is one)

Now, if you look closely at the inter-relationship between these three variables, you'll observe, that if I know that my receipt has/does not have "Tom's Delicious Pizzas" on it, the variable B does not tell me anything new towards predicting A! This is exactly what conditional independence means. "Conditioned" on the fact that I know C, the knowledge of B does not help me in any way to predict A. Hence, we can completely ignore B! In probabilistic terms, this is written as: 

P(A| B, C) = P(A|C) provided A and B are conditionally independent, given CNote: The "given C" part is really important. without any knowledge of C, B would suddenly become very relevant to predicting A. It is absolutely important for us to know C for the above formula to hold. If the following is true:

P(A|B) = P(A)

then it means that A and B are completely, or totally independent. This means, that B really has nothing to do with A. You can imagine A and B to be something like:

A - do you have a cold? (boolean variable, yes/no)

Read more

One of the more difficult things to do in Cascading is to access data that is small pieces of data which you might not want to join.  Cascading does have a mechanism to do this but it is limited.

 

Query Planning vs. Runtime

One of the things that beginners with Cascading have to understand is that most of the java code written actually creates the query plan.  So it is limited where in your code you can actually access HDFS.  This can be done through the FlowProcess object in cascading.  You are provided the flow process object only in certain locations due to the fact that so much of the code is for generating the query plan.

 

Mechanism

The mechanism in Cascading for accessing an HDFS file is through the openTapForRead method on a Tap.  Note that any local file or HDFS file can be a Tap.   But if the file is local it must be on the local node where the flow is running.  Notice that the method takes a FlowProcess object. 
 
public TupleEntryIterator openForRead(FlowProcess<Config> flowProcess)
                               throws IOException
 
 
This method is also available on the flow itself.
 
 
Once the method is called you have the iterator to the tap to retrieve the tuple data.

 This object roughly corresponds to each session of a flow while it is running.  This FlowProcess is only available for programmers to access in several of the Cascading objects.  These include operations such as Buffer, Aggregator,  Functions, and Filters.    Assemblies do not have this option.  The pattern is to read the hdfs file and set the data in the context in the prepare operation method.  The context then should be destroyed in the clean up method.

Read more

Pages

About Spry

We are a high standards Data Science & Hadoop firm solving complex problems for Fortune 500 companies

Keep In Touch

  •   info (@) spryinc.com
  •   +443.212.5072
  •   53 Loveton Circle
    Suite 114
    Sparks, MD 21152