Anyone here using python and hadoop streaming to process data? I am just dipping my toes into both and struggling. All the example on the web that i've found are the 'Word Count' examples. would love to find more examples. I have a specific example of what i need to achieve and have been able to solve that example by writing the python code, but it currently only works on csv files...
Thanks ahead of time for anyone interested in helping or pointing me in the right direction.
Cheers,
Eric
Good work erbrose! I have a few minor comments.
There is no need in initializing variables as integer. i = 0 and i = int(0) will give you identical results.
It looks like you are using variable n as an index to your list TmpArr. You don't need it. By iterating on the list, you can do this: - for item in TmpArr:
-
StrTemp = str(item[0]) + ',' + str(item[1])
I like to use item for the variable in that situation. If you need an index for another reason: - for j, item in enumerate(TmpArr):
Augmented assignment:
Keep up the good work!
8 2671 bvdet 2,851
Expert Mod 2GB
I have never heard of hadoop streaming. What kind of data is it, and how do you want to process it? If you could show us an example of the data, maybe we can help.
Yeah, I've also not come across hadoop. But we've often been able to help without knowing about all of it! Give us your code and more description of what you're trying to do, and why it doesn't work.
Thanks for both of your replies!!!!
Well here is the deal. I have a couple terabytes worth of text (csv) data that i need to loop through. The data is currently stored on a hadoop cluster... which basically breaks the data up into smaller chunks and distributes it across multiple machines, so when you run a python job via hadoop streaming, it processes the job on all the different machines, thus (in theory) speeding up the process via a Map and Reduce process. My sample data has an unique identifier and an integer value that will be used to create averages, medians values, standard devious, percentiles, and (eventually) some other important stats. Again, i am just dipping my toes into python, so you have to forgive my lengthy code! I know there are going to be tons of shortcuts that hopefully I will learn!. Here is what i have so far.
Basically I have one text file that contains a unique set of ID's.
ID
101
102
Then my 'big' data set looks something like this (but it is not sorted like this ie the ID's are all over the place)
ID----INT
101---23
101---25
101---20
101---19
101---30
101---28
101---27
101---27
101---23
102---30
102---28
102---27
102---27
102---23
Right now my code is NOT using any math or sci modules (which I am sure will be the way to go to smooth this process out) but for now I am manually calculating the values just to better understand python. - import time
-
-
unique = open("d:/temp/tmp/ID.txt", 'r')
-
writer = open("d:/temp/tmp/ID_out.txt", 'w')
-
t1 = time.clock()
-
avg_val=float()
-
b = int(0)
-
c = int(0)
-
i = int(0)
-
j = int(0)
-
Arr = []
-
SortArr = []
-
k = int(0)
-
l = int(0)
-
LoopArr = []
-
LoopCntr = int(0)
-
for ID in unique:
-
ID = ID.strip()
-
LoopArr.append(ID)
-
unique.close()
-
-
for m in LoopArr:
-
reader = open("d:/temp/tmp/sample.txt",'r')
-
-
while True:
-
line = reader.readline()
-
-
if len(line) != 0:
-
line = line.strip()
-
-
TmpArr = line.split(',')
-
-
if m == TmpArr[0]:
-
StrTmp=str(TmpArr[0])+','+str(TmpArr[1])+','+str('1')
-
SortArr.append(StrTmp.split(','))
-
i = i + 1
-
else:
-
break
-
-
reader.close()
-
SortArr.sort(key=lambda a: a[1])
-
median_val=int(0)
-
if i == 0:
-
median_val=SortArr[i][1]
-
avg_val=SortArr[i][1]
-
-
elif i == 1:
-
-
for a in SortArr:
-
c = c + int(SortArr[b][1])
-
b = b + 1
-
median_val = c/i
-
avg_val = c/i
-
-
elif i == 2:
-
median_val=SortArr[i-1][1]
-
for a in SortArr:
-
c = c + int(SortArr[b][1])
-
b = b + 1
-
-
avg_val = c/i
-
-
elif i%2==0:
-
median_val=(int(SortArr[i/2][1])+int(SortArr[(i/2)+1][1]))/2
-
for a in SortArr:
-
c = c + int(SortArr[b][1])
-
b = b + 1
-
-
avg_val = c/i
-
else:
-
median_val=int(SortArr[i/2][1])
-
for a in SortArr:
-
c = c + int(SortArr[b][1])
-
b = b + 1
-
-
avg_val = c/i
-
ID = m
-
SortArr = []
-
TempString = str(ID) + ',' + str(avg_val) + ',' + str(median_val) + ',' + str(i) + '\n'
-
writer.write(TempString)
-
i = int(0)
-
b = 0
-
avg_val = 0
-
median_val = 0
-
c = 0
-
writer.close()
-
t2 = time.clock() - t1
-
print t2
Sorry my code is not commented very well at the moment, basically because I keep trying new things!
here is a link for the basic "word count" using python and hadoop http://www.michael-noll.com/wiki/Wri...gram_In_Python
its really the only CLEAR example that i've been able to find where someone is using python and hadoop, although I know there are alot of folks out there using it too!
Thanks a bunch for helping me in any way shape or form!
-Eric
Okay, so it's not entirely clear what the problem is, but it doesn't look like it will run too efficiently if at all at the moment!
The first question is how many unique IDs are there?
If there's a reasonable number then an approach would be to do the following:
1. set up an array with a sorted list of the IDs
2. go through the large data set a line at a time ("for line in file:" is the normal way of doing this).
3. using each line find out the ID, use a bisect sort to find the appropriate ID, and updating each of the calculations for what you want to do.
4. save every so often to a file
But if there's an unreasonable number of IDs then we'd have to reconsider...
Let us know.
Thanks again for your reply!
Well i did some more research yesterday and it appears that with hadoop Map Reduce (via Streaming) you can set the Reduce to NONE, so that you only have to execute one script. So on that note... I am playing around with a sample data set of 39 Million rows. My sample contains ~20k unique ID's, in the real Terabytes worth of data there were will roughly 27 Million unique ID. Yesterday I was able to do a simple process that looped through the sample and put it into an array, sorted that array by the ID and VAL, then printed the sorted array out to a file via this. - TmpArr = []
-
for line in sys.stdin:
-
line = line.strip()
-
TmpArr.append(line.split(','))
-
TmpArr.sort(key=lambda a:(a[0],int(a[1])))
-
for row_ in TmpArr:
-
print (row_)
Thats all good for my first hadoop streaming job! I then got the idea to try and create two files (a sorted list and a uniuqe dict of ID's.) -
TmpArr = []
-
Unique = {}
-
for line in sys.stdin:
-
line = line.strip()
-
TmpArr.append(line.split(','))
-
ID,VAL = line.split(',',1)
-
Unique[ID] = Unique.get(ID,0)
-
TmpArr.sort(key=lambda a:(a[0],int(a[1])))
-
for row_ in TmpArr:
-
print (row_)
-
Again this worked just great. My next thought was to create two loops, the main looping through my Unique Dict, then for each row loop through my TmpArr and do all my math, but for some reason... hadoop doesn't like that, kept getting a time-out error. -
TmpArr = []
-
Unique = {}
-
for line in sys.stdin:
-
line = line.strip()
-
TmpArr.append(line.split(','))
-
ID,VAL = line.split(',',1)
-
Unique[ID] = Unique.get(ID,0)
-
TmpArr.sort(key=lambda a:(a[0],int(a[1])))
-
-
for m in Unique:
-
try:
-
for n in TmpArr:
-
#do a bunch of math
-
except ValueError:
-
pass
-
for row_ in TmpArr:
-
print (row_)
-
today I am going to just try and loop through my sorted TmpArr and do some simple if statements to say something like if TmpArr[i]==TmpArr[i+1] then output that value to another Temp Array. Will let you know (hopefully by the end of the day) how this process goes.
Hey all!
Well, i am now able to run this code on my sample csv file (3 million rows) on my desktop. It completes in under 1 minute. Which is great! Still having issues on the hadoop end, but i think that problem is not for this forum. Would still appreciate any suggestions or improvements on the code itself as im still very much a newbie! - import time
-
t1 = time.clock()
-
TmpArr = []
-
Unique = []
-
SortArr = []
-
OutArr = []
-
i = int(0)
-
l = int(0)
-
b = int(0)
-
c = int(0)
-
n = int(0)
-
reader = open("d:/temp/tmp/Input.csv",'r')
-
writer = open("d:/temp/tmp/Out.csv",'w')
-
-
-
for line in reader:
-
line = line.strip()
-
TmpArr.append(line.split(','))
-
TmpArr.sort(key=lambda a:(a[0],int(a[1])))
-
-
reader.close()
-
-
for m in TmpArr:
-
-
StrTemp = str(TmpArr[n][0]) + ',' + str(TmpArr[n][1])
-
SortArr.append(StrTemp.split(','))
-
#if first row go ahead and put into unique array
-
if i == 0:
-
Unique.append(StrTemp.split(','))
-
else:
-
if SortArr[i][0]==SortArr[i-1][0]:
-
StrTemp = str(TmpArr[i][0]) + ',' + str(TmpArr[i][1])
-
Unique.append(StrTemp.split(','))
-
else:
-
try:
-
l = len(Unique)
-
if l == 0:
-
median_val=Unique[l][1]
-
avg_val=Unique[l][1]
-
elif l == 1:
-
for a in Unique:
-
c = c + int(Unique[b][1])
-
b = b + 1
-
median_val = c/l
-
avg_val = c/l
-
elif l == 2:
-
median_val=Unique[l-1][1]
-
for a in Unique:
-
c = c + int(Unique[b][1])
-
b = b + 1
-
-
avg_val = c/l
-
-
elif l%2==0:
-
median_val=(int(Unique[l/2][1])+int(Unique[(l/2)+1][1]))/2
-
for a in Unique:
-
c = c + int(Unique[b][1])
-
b = b + 1
-
-
avg_val = c/l
-
else:
-
median_val=int(Unique[l/2][1])
-
for a in Unique:
-
c = c + int(Unique[b][1])
-
b = b + 1
-
-
avg_val = c/l
-
-
ID = Unique[0][0]
-
TempString = str(ID) + ',' + str(avg_val) + ',' + str(median_val) + ',' + str(l) + '\n'
-
writer.write(TempString)
-
Unique = []
-
SortArr = []
-
SortArr.append(StrTemp.split(','))
-
Unique.append(StrTemp.split(','))
-
OutArr.append(TempString.split(','))
-
i = 0
-
b = int(0)
-
avg_val = int(0)
-
median_val = int(0)
-
c = int(0)
-
except ValueError:
-
pass
-
-
-
-
i = i + 1
-
n = n + 1
-
-
-
t2 = time.clock() - t1
-
writer.write(str(t2))
-
writer.close()
bvdet 2,851
Expert Mod 2GB
Good work erbrose! I have a few minor comments.
There is no need in initializing variables as integer. i = 0 and i = int(0) will give you identical results.
It looks like you are using variable n as an index to your list TmpArr. You don't need it. By iterating on the list, you can do this: - for item in TmpArr:
-
StrTemp = str(item[0]) + ',' + str(item[1])
I like to use item for the variable in that situation. If you need an index for another reason: - for j, item in enumerate(TmpArr):
Augmented assignment:
Keep up the good work!
one last post (on this subject anyhow!) just wanted to let you know that I was able to complete my first Map/Reduce job on Hadoop with Python! Thanks again for all your help!
Sign in to post your reply or Sign up for a free account.
Similar topics
by: Emile van Sebille |
last post by:
QOTW: "If we get 2.3.3c1 out in early December, we could release 2.3.3 final
before the end of the year, and start 2004 with a 100% bug-free codebase
<wink>." -- Tim Peters
"cjOr proWe vbCould...
|
by: Emile van Sebille |
last post by:
QOTW: "Have you ever used the copy module? I am *not* a beginner, and have
used it *once* (and I can't remember what for, either)." -- Michael Hudson
"It will likely take a little practice...
|
by: Emile van Sebille |
last post by:
QOTW (in the OS agnostic category): "There is a (very popular) Python
package out there which exposes the win32 api. I'm not sure what it's
called. (win32api? pythonwin? win32all?)" -- Francis...
|
by: sapsi |
last post by:
Hello,
I'm not sure if this the correct list but here goes (and sorry for the
noise). I've been attempting to wrap python around libhdfs.
So far so good (i've attached the SWIG template at the...
|
by: Aaron Watters |
last post by:
So, in between skiing runs I noticed
a Business Week cover story on
"cloud computing". The article had
lots of interesting information in it like
about how somebody's mom used to
be an airline...
|
by: taylorcarr |
last post by:
A Canon printer is a smart device known for being advanced, efficient, and reliable. It is designed for home, office, and hybrid workspace use and can also be used for a variety of purposes. However,...
|
by: Charles Arthur |
last post by:
How do i turn on java script on a villaon, callus and itel keypad mobile phone
|
by: aa123db |
last post by:
Variable and constants
Use var or let for variables and const fror constants.
Var foo ='bar';
Let foo ='bar';const baz ='bar';
Functions
function $name$ ($parameters$) {
}
...
|
by: ryjfgjl |
last post by:
If we have dozens or hundreds of excel to import into the database, if we use the excel import function provided by database editors such as navicat, it will be extremely tedious and time-consuming...
|
by: ryjfgjl |
last post by:
In our work, we often receive Excel tables with data in the same format. If we want to analyze these data, it can be difficult to analyze them because the data is spread across multiple Excel files...
|
by: BarryA |
last post by:
What are the essential steps and strategies outlined in the Data Structures and Algorithms (DSA) roadmap for aspiring data scientists? How can individuals effectively utilize this roadmap to progress...
|
by: nemocccc |
last post by:
hello, everyone, I want to develop a software for my android phone for daily needs, any suggestions?
|
by: marktang |
last post by:
ONU (Optical Network Unit) is one of the key components for providing high-speed Internet services. Its primary function is to act as an endpoint device located at the user's premises. However,...
|
by: jinu1996 |
last post by:
In today's digital age, having a compelling online presence is paramount for businesses aiming to thrive in a competitive landscape. At the heart of this digital strategy lies an intricately woven...
| |