You can also use HDInsight with Hive + Python.
The drawback of the latter is that you use streaming between Hive and Python. In Hadoop streaming is just a way to call stdin/stdout inter process communication. So if you just do simple operations like string concatenations between two fields in Python it may be slow. the good things is that hive has user defined functions and also standard ones that help do all the simple things (like string concatenation).
There’s a way to use Python language without using streaming: just run Python in the JVM (remember Hadoop is written in Java). Python in the JVM is Jython. And Pig (an equivalent to Hive that has its own scripting language, instead of using SQL) can call Jython scripts.
With HDInsight 3.0 which became generally available recently, you can use that kind of feature. Here’s how. In order to launch the job here, I use a script from Linux that leverages WebHCat / Templeton REST API from a Linux machine. Here is how.
The Python script that launches the job is the following:
import requests #http://pypi.python.org/pypi/requests clusterName='monclusterhadoop' clusterAdmin='cornac' clusterPassword='ChangeWithY0urs!' #get WebHCat status webHCatUrl='https://' + clusterName + '.azurehdinsight.net/templeton/v1/status' r = requests.get(webHCatUrl, auth=(clusterAdmin, clusterPassword)) print r.status_code print r.json() #submit a pig job: # http://docs.hortonworks.com/HDPDocuments/HDP1/HDP-Win-1.3.0/ds_HCatalog/pig.html webHCatUrl='https://' + clusterName + '.azurehdinsight.net/templeton/v1/pig' hive_params={'user.name':clusterAdmin, 'file':'wasb://[email protected]/scripts/pig_python/pig_python.pig', 'statusdir': '/wasbwork/pig_from_python'} r = requests.post(webHCatUrl, auth=(clusterAdmin, clusterPassword), data=hive_params) print r.status_code print r.json()
the pig job looks like this:
Register ‘wasb://[email protected]/scripts/pig_python/pig_python.py’ using jython as myfuncs;
a = load ‘wasb://[email protected]/data/ref_villes’ using PigStorage(‘ ‘) as (ville:chararray);
b = foreach a generate ville, myfuncs.helloworld(), myfuncs.square(3);
store b into ‘/wasbwork/pigresult’;
the Python script called by Pig and defines a few sample basic functions is the following:
#!/usr/bin/python @outputSchema("word:chararray") def helloworld(): return ('Hello, World') @outputSchema("t:(word:chararray,num:long)") def complex(word): return (str(word),long(word)*long(word)) @outputSchemaFunction("squareSchema") def square(num): return ((num)*(num)) @schemaFunction("squareSchema") def squareSchema(input): return input # No decorator - bytearray def concat(str): return str+str
Source data (ref_villes) looks like this (first lines) :
paris
marseille
lyon
toulouse
nice
nantes
strasbourg
montpellier
bordeaux
lille
rennes
reims
le havre
saint-etienne
toulon
grenoble
the output (part-m-00000) looks like this
paris Hello, World 9
marseille Hello, World 9
lyon Hello, World 9
toulouse Hello, World 9
nice Hello, World 9
nantes Hello, World 9
strasbourg Hello, World 9
montpellier Hello, World 9
bordeaux Hello, World 9
lille Hello, World 9
rennes Hello, World 9
reims Hello, World 9
le Hello, World 9
saint-etienne Hello, World 9
toulon Hello, World 9
grenoble Hello, World 9
the execution report looks like this (stderr):
2014-03-21 11:50:59,951 [main] INFO org.apache.pig.Main - Apache Pig version 0.12.0.2.0.7.0-1551 (r: unknown) compiled Feb 19 2014, 11:47:04
2014-03-21 11:50:59,951 [main] INFO org.apache.pig.Main - Logging error messages to: C:\apps\dist\hadoop-2.2.0.2.0.7.0-1551\logs\pig_1395402659935.log
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/C:/apps/dist/hadoop-2.2.0.2.0.7.0-1551/share/hadoop/common/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/C:/apps/dist/pig-0.12.0.2.0.7.0-1551/pig-0.12.0.2.0.7.0-1551.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
2014-03-21 11:51:00,810 [main] INFO org.apache.pig.impl.util.Utils - Default bootup file D:\Users\hdp/.pigbootup not found
2014-03-21 11:51:00,997 [main] INFO org.apache.hadoop.conf.Configuration.deprecation - mapred.job.tracker is deprecated. Instead, use mapreduce.jobtracker.address
2014-03-21 11:51:00,997 [main] INFO org.apache.hadoop.conf.Configuration.deprecation - fs.default.name is deprecated. Instead, use fs.defaultFS
2014-03-21 11:51:00,997 [main] INFO org.apache.pig.backend.hadoop.executionengine.HExecutionEngine - Connecting to hadoop file system at: wasb://[email protected]
2014-03-21 11:51:01,451 [main] INFO org.apache.hadoop.conf.Configuration.deprecation - fs.default.name is deprecated. Instead, use fs.defaultFS
2014-03-21 11:51:01,841 [main] INFO org.apache.pig.scripting.jython.JythonScriptEngine - created tmp python.cachedir=D:\Users\hdp\AppData\Local\Temp\pig_jython_5196260548692206718
2014-03-21 11:51:03,951 [main] WARN org.apache.pig.scripting.jython.JythonScriptEngine - pig.cmd.args.remainders is empty. This is not expected unless on testing.
2014-03-21 11:51:04,560 [main] INFO org.apache.pig.scripting.jython.JythonScriptEngine - Register scripting UDF: myfuncs.complex
2014-03-21 11:51:04,560 [main] INFO org.apache.pig.scripting.jython.JythonScriptEngine - Register scripting UDF: myfuncs.square
2014-03-21 11:51:04,576 [main] INFO org.apache.pig.scripting.jython.JythonScriptEngine - Register scripting UDF: myfuncs.helloworld
2014-03-21 11:51:04,576 [main] INFO org.apache.pig.scripting.jython.JythonScriptEngine - Register scripting UDF: myfuncs.concat
2014-03-21 11:51:04,701 [main] INFO org.apache.hadoop.conf.Configuration.deprecation - fs.default.name is deprecated. Instead, use fs.defaultFS
2014-03-21 11:51:04,951 [main] INFO org.apache.pig.scripting.jython.JythonFunction - Schema 'word:chararray' defined for func helloworld
2014-03-21 11:51:05,232 [main] INFO org.apache.pig.tools.pigstats.ScriptState - Pig features used in the script: UNKNOWN
2014-03-21 11:51:05,326 [main] INFO org.apache.pig.newplan.logical.optimizer.LogicalPlanOptimizer - {RULES_ENABLED=[AddForEach, ColumnMapKeyPrune, DuplicateForEachColumnRewrite, GroupByConstParallelSetter, ImplicitSplitInserter, LimitOptimizer, LoadTypeCastInserter, MergeFilter, MergeForEach, NewPartitionFilterOptimizer, PartitionFilterOptimizer, PushDownForEachFlatten, PushUpFilter, SplitFilter, StreamTypeCastInserter], RULES_DISABLED=[FilterLogicExpressionSimplifier]}
2014-03-21 11:51:05,482 [main] INFO org.apache.hadoop.conf.Configuration.deprecation - mapred.textoutputformat.separator is deprecated. Instead, use mapreduce.output.textoutputformat.separator
2014-03-21 11:51:05,763 [main] INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRCompiler - File concatenation threshold: 100 optimistic? false
2014-03-21 11:51:05,810 [main] INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MultiQueryOptimizer - MR plan size before optimization: 1
2014-03-21 11:51:05,810 [main] INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MultiQueryOptimizer - MR plan size after optimization: 1
2014-03-21 11:51:06,091 [main] INFO org.apache.hadoop.yarn.client.RMProxy - Connecting to ResourceManager at headnode0/100.86.204.54:9010
2014-03-21 11:51:06,263 [main] INFO org.apache.pig.tools.pigstats.ScriptState - Pig script settings are added to the job
2014-03-21 11:51:06,263 [main] INFO org.apache.hadoop.conf.Configuration.deprecation - mapred.job.reduce.markreset.buffer.percent is deprecated. Instead, use mapreduce.reduce.markreset.buffer.percent
2014-03-21 11:51:06,263 [main] INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler - mapred.job.reduce.markreset.buffer.percent is not set, set to default 0.3
2014-03-21 11:51:06,263 [main] INFO org.apache.hadoop.conf.Configuration.deprecation - mapred.output.compress is deprecated. Instead, use mapreduce.output.fileoutputformat.compress
2014-03-21 11:51:06,279 [main] INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler - creating jar file Job608857099241848139.jar
2014-03-21 11:51:14,294 [main] INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler - jar file Job608857099241848139.jar created
2014-03-21 11:51:14,294 [main] INFO org.apache.hadoop.conf.Configuration.deprecation - mapred.jar is deprecated. Instead, use mapreduce.job.jar
2014-03-21 11:51:14,341 [main] INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler - Setting up single store job
2014-03-21 11:51:14,341 [main] INFO org.apache.pig.data.SchemaTupleFrontend - Key [pig.schematuple] is false, will not generate code.
2014-03-21 11:51:14,341 [main] INFO org.apache.pig.data.SchemaTupleFrontend - Starting process to move generated code to distributed cache
2014-03-21 11:51:14,341 [main] INFO org.apache.pig.data.SchemaTupleFrontend - Setting key [pig.schematuple.classes] with classes to deserialize []
2014-03-21 11:51:14,404 [main] INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - 1 map-reduce job(s) waiting for submission.
2014-03-21 11:51:14,404 [main] INFO org.apache.hadoop.conf.Configuration.deprecation - mapred.job.tracker.http.address is deprecated. Instead, use mapreduce.jobtracker.http.address
2014-03-21 11:51:14,404 [JobControl] INFO org.apache.hadoop.yarn.client.RMProxy - Connecting to ResourceManager at headnode0/100.86.204.54:9010
2014-03-21 11:51:14,513 [JobControl] INFO org.apache.hadoop.conf.Configuration.deprecation - fs.default.name is deprecated. Instead, use fs.defaultFS
2014-03-21 11:51:16,154 [JobControl] INFO org.apache.hadoop.mapreduce.lib.input.FileInputFormat - Total input paths to process : 2
2014-03-21 11:51:16,154 [JobControl] INFO org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil - Total input paths to process : 2
2014-03-21 11:51:16,185 [JobControl] INFO org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil - Total input paths (combined) to process : 1
2014-03-21 11:51:16,435 [JobControl] INFO org.apache.hadoop.mapreduce.JobSubmitter - number of splits:1
2014-03-21 11:51:16,732 [JobControl] INFO org.apache.hadoop.mapreduce.JobSubmitter - Submitting tokens for job: job_1395391185318_0006
2014-03-21 11:51:16,732 [JobControl] INFO org.apache.hadoop.mapreduce.JobSubmitter - Kind: mapreduce.job, Service: job_1395391185318_0005, Ident: (org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier@45d45314)
2014-03-21 11:51:16,763 [JobControl] INFO org.apache.hadoop.mapreduce.JobSubmitter - Kind: RM_DELEGATION_TOKEN, Service: 100.86.204.54:9010, Ident: (owner=cornac, renewer=mr token, realUser=hdp, issueDate=1395402643673, maxDate=1396007443673, sequenceNumber=5, masterKeyId=2)
2014-03-21 11:51:17,154 [JobControl] INFO org.apache.hadoop.yarn.client.api.impl.YarnClientImpl - Submitted application application_1395391185318_0006 to ResourceManager at headnode0/100.86.204.54:9010
2014-03-21 11:51:17,232 [JobControl] INFO org.apache.hadoop.mapreduce.Job - The url to track the job: http://headnode0:9014/proxy/application_1395391185318_0006/
2014-03-21 11:51:17,232 [main] INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - HadoopJobId: job_1395391185318_0006
2014-03-21 11:51:17,232 [main] INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - Processing aliases a,b
2014-03-21 11:51:17,232 [main] INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - detailed locations: M: a[3,4],b[-1,-1] C: R:
2014-03-21 11:51:17,279 [main] INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - 0% complete
2014-03-21 11:51:34,575 [main] INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - 50% complete
2014-03-21 11:51:37,981 [main] INFO org.apache.hadoop.conf.Configuration.deprecation - mapred.reduce.tasks is deprecated. Instead, use mapreduce.job.reduces
2014-03-21 11:51:38,028 [main] INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - 100% complete
2014-03-21 11:51:38,028 [main] INFO org.apache.pig.tools.pigstats.SimplePigStats - Script Statistics:
HadoopVersion PigVersion UserId StartedAt FinishedAt Features
2.2.0.2.0.7.0-1551 0.12.0.2.0.7.0-1551 hdp 2014-03-21 11:51:06 2014-03-21 11:51:38 UNKNOWN
Success!
Job Stats (time in seconds):
JobId Maps Reduces MaxMapTime MinMapTIme AvgMapTime MedianMapTime MaxReduceTime MinReduceTime AvgReduceTime MedianReducetime Alias Feature Outputs
job_1395391185318_0006 1 0 6 6 6 6 n/a n/a n/a n/a a,b MAP_ONLY /wasbwork/pigresult,
Input(s):
Successfully read 260 records from: "wasb://[email protected]/data/ref_villes"
Output(s):
Successfully stored 260 records in: "/wasbwork/pigresult"
Counters:
Total records written : 260
Total bytes written : 0
Spillable Memory Manager spill count : 0
Total bags proactively spilled: 0
Total records proactively spilled: 0
Job DAG:
job_1395391185318_0006
2014-03-21 11:51:38,278 [main] INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - Success!
Benjamin (@benjguin)
Blog Post by: Benjamin GUINEBERTIERE