A simple example: how to call Python from Hive in HDInsight

Introduction

Hadoop framework distributes code execution automatically in a multi node cluster. This code is also distributed against the dataset. Code development in Hadoop can be done in Java and one has to implement a map function and a reduce function; both manipulate keys and values as inputs and outputs. At a higher level, there are two scripting languages that simplify the code: PIG is a specific scripting language, HIVE looks like SQL. So using HIVE is quite easy. It has a bunch of extension functions (called user defined functions) to transform data like regular expression tools and so on. A developer can add user defined functions, by developing them in Java. Another way to have a procedural logic that complements SQL Set-based language is to use a language like Python:

 

The goal of that post is to show an example of such a combination.

Here is how that could look on a small cluster. The work load is distributed on the different worker nodes:

At a worker node level, a Python process is created by core. Each process receives its part of the whole dataset:

Windows Azure comes with its Hadoop as a service called HDInsight. This allows to execute HIVE, PIG, and other Map/reduce jobs a few minutes after requesting the creation of a cluster. For HIVE, HDInsight comes with a sample table. Let’s run a HIVE + Python job against that hivesampletable table.

Hive and Python Script

In this example, we use a Python module to calculate the hash of a label in the sample table.

Hive is used to get the data, partition it and send the rows to the Python processes which are created on the different cluster nodes. Here is the code:

add file simple_sample.py;

SELECT TRANSFORM (clientid, devicemake, devicemodel)
    USING 'D:\Python27\python.exe simple_sample.py' AS 
    (clientid string, phoneLabel string, phoneHash string)
FROM hivesampletable
ORDER BY clientid LIMIT 50;

This can be read has: in the first 50 rows of hivesampletable table, select clientid, devicemake, devicemodel , pass them to the simple_sample.py python script that can be run with D:\Python27\python.exe. The script will send back columns clientid (a string), phoneLabel (a string) and phoneHash (a string).

Hive sends data to the simple_sample.py scripts. Here is the code of that script:

import sys
import string
import hashlib

while True:
    line = sys.stdin.readline()
    if not line:
        break

    line = string.strip(line, "\n ")
    clientid, devicemake, devicemodel = string.split(line, "\t")
    phone_label = devicemake + ' ' + devicemodel
    print "\t".join([clientid, phone_label, hashlib.md5(phone_label).hexdigest()])

This script expects stdin lines. It parses them, and obtains the columned passed by Hive: clientid, devicemake, devicemodel. From that columns, it deduces the resulting columns: clientid, phoneLabel, phoneHash. In order to calculate phoneHash, it uses an imported module (hashlib). In order to output the result, the python script writes it to stdout, separated by TAB.

Let’s run it with PowerShell

Here is a sample PowerShell script that

  • creates an HDInsight cluster
  • Runs the job
  • Gets the result
  • Removes the cluster

Before running the script, the HIVE and the Python script must have been copied to the the Windows Azure storage:

Here is the PowerShell script:

Import-Module azure
Add-AzureAccount

$Subscription = 'Azdem169A44055X'
$defaultStorageAccount = 'monstockageazure'
$clusterName = 'monclusterhadoop'
$clusterVersion='2.1'
$clusterAdmin = 'cornac'
$clusterPassword = 'LElzgqy#n87'

$passwd = ConvertTo-SecureString $clusterPassword -AsPlainText -Force
$clusterCredentials = New-Object System.Management.Automation.PSCredential ($clusterAdmin, $passwd)

Set-AzureSubscription -SubscriptionName $Subscription -CurrentStorageAccount $defaultStorageAccount
Select-AzureSubscription -Current $Subscription

$storageAccount1 = (Get-AzureSubscription $Subscription).CurrentStorageAccountName
$key1 = Get-AzureStorageKey -StorageAccountName $storageAccount1 | %{ $_.Primary }

New-AzureHDInsightClusterConfig -ClusterSizeInNodes 3 |
    Set-AzureHDInsightDefaultStorage -StorageAccountName "${storageAccount1}.blob.core.windows.net" -StorageAccountKey $key1 `
        -StorageContainerName $clusterName |
    New-AzureHDInsightCluster -Name $clusterName -Version $clusterVersion -Location "North Europe" -Credential $clusterCredentials

Use-AzureHDInsightCluster "monclusterhadoop"

$hiveJobVT = New-AzureHDInsightHiveJobDefinition -File "wasb://[email protected]/simple_sample.hql"
$hiveJobVT.Files.Add("wasb://[email protected]/simple_sample.py")
$startedHiveJobVT = $hiveJobVT | Start-AzureHDInsightJob -Credential $clusterCredentials -Cluster "monclusterhadoop"

$startedHiveJobVT | Wait-AzureHDInsightJob -Credential $clusterCredentials

Get-AzureHDInsightJobOutput -StandardError -JobId $startedHiveJobVT.JobId -Cluster "monclusterhadoop"
Get-AzureHDInsightJobOutput -StandardOutput -JobId $startedHiveJobVT.JobId -Cluster "monclusterhadoop"

Remove-AzureHDInsightCluster -Name $clusterName

Here is a sample execution result:

PS C:\benjguin\BigData_Hadoop\demos\simple> Import-Module azure
Add-AzureAccount


PS C:\benjguin\BigData_Hadoop\demos\simple> Import-Module azure
Add-AzureAccount

$Subscription = 'Azdem169A44055X'
$defaultStorageAccount = 'monstockageazure'
$clusterName = 'monclusterhadoop'
$clusterVersion='2.1'
$clusterAdmin = 'cornac'
$clusterPassword = 'LElzgqy#n87'

$passwd = ConvertTo-SecureString $clusterPassword -AsPlainText -Force
$clusterCredentials = New-Object System.Management.Automation.PSCredential ($clusterAdmin, $passwd)

Set-AzureSubscription -SubscriptionName $Subscription -CurrentStorageAccount $defaultStorageAccount
Select-AzureSubscription -Current $Subscription

$storageAccount1 = (Get-AzureSubscription $Subscription).CurrentStorageAccountName
$key1 = Get-AzureStorageKey -StorageAccountName $storageAccount1 | %{ $_.Primary }

New-AzureHDInsightClusterConfig -ClusterSizeInNodes 3 |
    Set-AzureHDInsightDefaultStorage -StorageAccountName "${storageAccount1}.blob.core.windows.net" -StorageAccountKey $key1 `
        -StorageContainerName $clusterName |
    New-AzureHDInsightCluster -Name $clusterName -Version $clusterVersion -Location "North Europe" -Credential $clusterCredentials



ClusterSizeInNodes    : 3
ConnectionUrl         : https://monclusterhadoop.azurehdinsight.net
CreateDate            : 03/03/2014 14:15:50
DefaultStorageAccount : monstockageazure.blob.core.windows.net
HttpUserName          : cornac
Location              : North Europe
Name                  : monclusterhadoop
State                 : Running
StorageAccounts       : {}
SubscriptionId        : 0fa85b4c-aa27-44ba-84e5-fa51aac32734
UserName              : cornac
Version               : 2.1.4.0.526800
VersionStatus         : Compatible

PS C:\benjguin\BigData_Hadoop\demos\simple> Use-AzureHDInsightCluster "monclusterhadoop"

$hiveJobVT = New-AzureHDInsightHiveJobDefinition -File "wasb://[email protected]/simple_sample.hql"
$hiveJobVT.Files.Add("wasb://[email protected]/simple_sample.py")
$startedHiveJobVT = $hiveJobVT | Start-AzureHDInsightJob -Credential $clusterCredentials -Cluster "monclusterhadoop"

$startedHiveJobVT | Wait-AzureHDInsightJob -Credential $clusterCredentials

Get-AzureHDInsightJobOutput -StandardError -JobId $startedHiveJobVT.JobId -Cluster "monclusterhadoop"
Get-AzureHDInsightJobOutput -StandardOutput -JobId $startedHiveJobVT.JobId -Cluster "monclusterhadoop"
Successfully connected to cluster monclusterhadoop


Cluster         : monclusterhadoop
ExitCode        : 0
Name            : Hive: simple_sample.hql
PercentComplete : map = 100%,  reduce = 100%
Query           : 
State           : Completed
StatusDirectory : b4328d2f-589c-412e-83e5-f8a544cb321c
SubmissionTime  : 03/03/2014 14:36:48
JobId           : job_201403031426_0003


Logging initialized using configuration in file:/C:/apps/dist/hive-0.11.0.1.3.5.0-03/conf/hive-log4j.properties
Added resource: simple_sample.py
Total MapReduce jobs = 1
Launching Job 1 out of 1
Number of reduce tasks determined at compile time: 1
In order to change the average load for a reducer (in bytes):
  set hive.exec.reducers.bytes.per.reducer=<number>
In order to limit the maximum number of reducers:
  set hive.exec.reducers.max=<number>
In order to set a constant number of reducers:
  set mapred.reduce.tasks=<number>
Starting Job = job_201403031426_0004, Tracking URL = http://jobtrackerhost:50030/jobdetails.jsp?jobid=job_201403031426_0004
Kill Command = "C:\apps\dist\hadoop-1.2.0.1.3.5.0-03\bin\hadoop.cmd" job  -kill job_201403031426_0004
Hadoop job information for Stage-1: number of mappers: 1; number of reducers: 1
2014-03-03 14:37:20,821 Stage-1 map = 0%,  reduce = 0%
2014-03-03 14:37:25,883 Stage-1 map = 100%,  reduce = 0%, Cumulative CPU 5.469 sec
2014-03-03 14:37:26,915 Stage-1 map = 100%,  reduce = 0%, Cumulative CPU 5.469 sec
2014-03-03 14:37:27,946 Stage-1 map = 100%,  reduce = 0%, Cumulative CPU 5.469 sec
2014-03-03 14:37:28,962 Stage-1 map = 100%,  reduce = 0%, Cumulative CPU 5.469 sec
2014-03-03 14:37:29,977 Stage-1 map = 100%,  reduce = 0%, Cumulative CPU 5.469 sec
2014-03-03 14:37:30,993 Stage-1 map = 100%,  reduce = 0%, Cumulative CPU 5.469 sec
2014-03-03 14:37:32,008 Stage-1 map = 100%,  reduce = 0%, Cumulative CPU 5.469 sec
2014-03-03 14:37:33,024 Stage-1 map = 100%,  reduce = 0%, Cumulative CPU 5.469 sec
2014-03-03 14:37:34,024 Stage-1 map = 100%,  reduce = 33%, Cumulative CPU 5.469 sec
2014-03-03 14:37:35,040 Stage-1 map = 100%,  reduce = 33%, Cumulative CPU 5.469 sec
2014-03-03 14:37:36,055 Stage-1 map = 100%,  reduce = 100%, Cumulative CPU 9.265 sec
2014-03-03 14:37:37,055 Stage-1 map = 100%,  reduce = 100%, Cumulative CPU 9.265 sec
2014-03-03 14:37:38,055 Stage-1 map = 100%,  reduce = 100%, Cumulative CPU 9.265 sec
MapReduce Total cumulative CPU time: 9 seconds 265 msec
Ended Job = job_201403031426_0004
MapReduce Jobs Launched: 
Job 0: Map: 1  Reduce: 1   Cumulative CPU: 9.265 sec   HDFS Read: 266 HDFS Write: 2684 SUCCESS
Total MapReduce CPU Time Spent: 9 seconds 265 msec
OK
Time taken: 36.86 seconds, Fetched: 50 row(s)

100004    Motorola Droid X    02a4198bedd37119dabcbb2e8fb4ec92
100015    Apple iPod Touch 4.3.x    d9bc8c98d6a6556656e774a64f7b8bb2
100015    Apple iPod Touch 4.3.x    d9bc8c98d6a6556656e774a64f7b8bb2
100035    LG VS910    b4bfdffa3e288ed0283ae8c8a37c455e
100035    LG VS910    b4bfdffa3e288ed0283ae8c8a37c455e
100035    LG VS910    b4bfdffa3e288ed0283ae8c8a37c455e
100035    LG VS910    b4bfdffa3e288ed0283ae8c8a37c455e
100035    LG VS910    b4bfdffa3e288ed0283ae8c8a37c455e
100035    LG VS910    b4bfdffa3e288ed0283ae8c8a37c455e
100035    LG VS910    b4bfdffa3e288ed0283ae8c8a37c455e
100035    LG VS910    b4bfdffa3e288ed0283ae8c8a37c455e
100035    LG VS910    b4bfdffa3e288ed0283ae8c8a37c455e
100035    LG VS910    b4bfdffa3e288ed0283ae8c8a37c455e
100035    LG VS910    b4bfdffa3e288ed0283ae8c8a37c455e
100035    LG VS910    b4bfdffa3e288ed0283ae8c8a37c455e
100035    LG VS910    b4bfdffa3e288ed0283ae8c8a37c455e
100036    Samsung SCH-i400    6b314786cda6123fc06eeb855825aea7
100036    Samsung SCH-i400    6b314786cda6123fc06eeb855825aea7
100036    Samsung SCH-i400    6b314786cda6123fc06eeb855825aea7
100036    Samsung SCH-i400    6b314786cda6123fc06eeb855825aea7
100036    Samsung SCH-i400    6b314786cda6123fc06eeb855825aea7
100036    Samsung SCH-i400    6b314786cda6123fc06eeb855825aea7
100036    Samsung SCH-i400    6b314786cda6123fc06eeb855825aea7
100036    Samsung SCH-i400    6b314786cda6123fc06eeb855825aea7
100036    Samsung SCH-i400    6b314786cda6123fc06eeb855825aea7
100036    Samsung SCH-i400    6b314786cda6123fc06eeb855825aea7
100036    Samsung SCH-i400    6b314786cda6123fc06eeb855825aea7
100036    Samsung SCH-i400    6b314786cda6123fc06eeb855825aea7
100036    Samsung SCH-i400    6b314786cda6123fc06eeb855825aea7
100036    Samsung SCH-i400    6b314786cda6123fc06eeb855825aea7
100036    Samsung SCH-i400    6b314786cda6123fc06eeb855825aea7
100036    Samsung SCH-i400    6b314786cda6123fc06eeb855825aea7
100036    Samsung SCH-i400    6b314786cda6123fc06eeb855825aea7
100036    Samsung SCH-i400    6b314786cda6123fc06eeb855825aea7
100036    Samsung SCH-i400    6b314786cda6123fc06eeb855825aea7
100036    Samsung SCH-i400    6b314786cda6123fc06eeb855825aea7
100036    Samsung SCH-i400    6b314786cda6123fc06eeb855825aea7
100041    RIM 9650    d476f3687700442549a83fac4560c51c
100041    RIM 9650    d476f3687700442549a83fac4560c51c
100041    RIM 9650    d476f3687700442549a83fac4560c51c
100041    RIM 9650    d476f3687700442549a83fac4560c51c
100041    RIM 9650    d476f3687700442549a83fac4560c51c
100041    RIM 9650    d476f3687700442549a83fac4560c51c
100041    RIM 9650    d476f3687700442549a83fac4560c51c
100041    RIM 9650    d476f3687700442549a83fac4560c51c
100041    RIM 9650    d476f3687700442549a83fac4560c51c
100041    RIM 9650    d476f3687700442549a83fac4560c51c
100042    Apple iPhone 4.2.x    375ad9a0ddc4351536804f1d5d0ea9b9
100042    Apple iPhone 4.2.x    375ad9a0ddc4351536804f1d5d0ea9b9
100042    Apple iPhone 4.2.x    375ad9a0ddc4351536804f1d5d0ea9b9

Remove-AzureHDInsightCluster -Name $clusterName

Benjamin (@benjguin)

Blog Post by: Benjamin GUINEBERTIERE

BizTalk Server Tip #29: Develop adapter using WCF

When developing new adapters create a Custom WCF Channel or use the WCF LOB SDK as a reference starting point, this will allow you to create a scalable and easy to host adapter that can be used across other .NET solutions. This level of flexibility will make the adapter more likely to be reused somewhere […]

The post BizTalk Server Tip #29: Develop adapter using WCF appeared first on BizTalk360 Blog.

Blog Post by: Ricardo Torre

SharePoint 2013 SP1 Released

Overview
We’ve been hearing Q1 2014 as a release date for SharePoint 2013 SP1 for some time now, and most of us have been thinking we’d get that date at SharePoint Conference 2014.  The conference Yammer feed was just updated with the news that SharePoint 2013 SP1 has been released.
SP2013 SP1 Download Info
http://blogs.technet.com/b/stefan_gossner/archive/2014/02/26/service-pack-1-for-sharepoint-2013-is-now-available-for-download.aspx
Installation Tips
Ensure you […]
Blog Post by: Michael Gerety

BizTalk Server Tip #28: Avoid Orchestrations when possible

Use static routing, content based routing or itineraries to avoid using Orchestrations and use routing of failed messages for advance error handling since messaging doesn’t provide a rich error handling capability. This approach will give you the high performance of messaging and the power of the Orchestrations when necessary. When a high volume of messages […]

The post BizTalk Server Tip #28: Avoid Orchestrations when possible appeared first on BizTalk360 Blog.

Blog Post by: Ricardo Torre

BizTalk Server Tip #27: Implement a Enterprise Service Bus using the ESB Toolkit

Use ESB Toolkit to implement a dynamic self-adapting solution. It enables the implementation of the ESB pattern within BizTalk by maximizing the re-use of services while at the same time maintaining the flexibility to easily change solution. The ESB Toolkit extends the capabilities of supporting a loosely coupled and dynamic messaging architecture. Here is a […]

The post BizTalk Server Tip #27: Implement a Enterprise Service Bus using the ESB Toolkit appeared first on BizTalk360 Blog.

Blog Post by: Ricardo Torre

BizTalk Orchestration timeouts for request-response operations without using a long running transactional scope

BizTalk Orchestration timeouts for request-response operations without using a long running transactional scope

My current project requires an orchestration to be built that will call out to one or potentially many WCF Services through a sequential loop, the specific services to be called on within the loop being resolved from the Business Rules Engine based on message context. The orchestration needs to provide for guaranteed delivery and has […]
Blog Post by: Johann

BizTalk Server Tip #26: Test 100% of your code

Guarantee full code coverage by running Orchestration Profiler of tracking data of your load tests. When testing your BizTalk Applications it is important to guarantee all code is executed, even the error paths. The Orchestration profiler will help you understand how much of the code you covered with your test and also give important information […]

The post BizTalk Server Tip #26: Test 100% of your code appeared first on BizTalk360 Blog.

Blog Post by: Ricardo Torre

BizTalk 2009 – Error Enlisting Send Port – “Exception from HRESULT: 0xC00CE557″

BizTalk 2009 – Error Enlisting Send Port – “Exception from HRESULT: 0xC00CE557″

It’s the second time that I have encountered this obscure error in BizTalk 2009, when attempting to enlist a send port after importing bindings originally exported via the admin console: Both times (for different customers) the issue was that an extra carriage return and line feed had been inputted before and after the <Filter> element […]
Blog Post by: James Corbould

Exploring the Windows Azure BizTalk Service Explorer Features

Windows Azure BizTalk Services (WABS) is general available now as a service within Windows Azure. To test or debug your Windows Azure BizTalk Service Bridge you can rely on using the BizTalk Service Explorer. This explorer is similar to another Visual Studio add-in like the Service Bus or Windows Azure Storage.

The BizTalk Service explorer offers the following features:

Browsing artifacts