473,387 Members | 1,693 Online
Bytes | Software Development & Data Engineering Community
Post Job

Home Posts Topics Members FAQ

Join Bytes to post your question to a community of 473,387 software developers and data experts.

python and hadoop

58
Anyone here using python and hadoop streaming to process data? I am just dipping my toes into both and struggling. All the example on the web that i've found are the 'Word Count' examples. would love to find more examples. I have a specific example of what i need to achieve and have been able to solve that example by writing the python code, but it currently only works on csv files...
Thanks ahead of time for anyone interested in helping or pointing me in the right direction.
Cheers,
Eric
Apr 21 '10 #1

✓ answered by bvdet

Good work erbrose! I have a few minor comments.

There is no need in initializing variables as integer. i = 0 and i = int(0) will give you identical results.

It looks like you are using variable n as an index to your list TmpArr. You don't need it. By iterating on the list, you can do this:
Expand|Select|Wrap|Line Numbers
  1. for item in TmpArr:
  2.     StrTemp = str(item[0]) + ',' + str(item[1])
I like to use item for the variable in that situation. If you need an index for another reason:
Expand|Select|Wrap|Line Numbers
  1. for j, item in enumerate(TmpArr):
Augmented assignment:
Expand|Select|Wrap|Line Numbers
  1.     i += 1
Keep up the good work!

8 2671
bvdet
2,851 Expert Mod 2GB
I have never heard of hadoop streaming. What kind of data is it, and how do you want to process it? If you could show us an example of the data, maybe we can help.
Apr 21 '10 #2
Glenton
391 Expert 256MB
Yeah, I've also not come across hadoop. But we've often been able to help without knowing about all of it! Give us your code and more description of what you're trying to do, and why it doesn't work.
Apr 22 '10 #3
erbrose
58
Thanks for both of your replies!!!!
Well here is the deal. I have a couple terabytes worth of text (csv) data that i need to loop through. The data is currently stored on a hadoop cluster... which basically breaks the data up into smaller chunks and distributes it across multiple machines, so when you run a python job via hadoop streaming, it processes the job on all the different machines, thus (in theory) speeding up the process via a Map and Reduce process. My sample data has an unique identifier and an integer value that will be used to create averages, medians values, standard devious, percentiles, and (eventually) some other important stats. Again, i am just dipping my toes into python, so you have to forgive my lengthy code! I know there are going to be tons of shortcuts that hopefully I will learn!. Here is what i have so far.
Basically I have one text file that contains a unique set of ID's.

ID
101
102

Then my 'big' data set looks something like this (but it is not sorted like this ie the ID's are all over the place)

ID----INT
101---23
101---25
101---20
101---19
101---30
101---28
101---27
101---27
101---23
102---30
102---28
102---27
102---27
102---23

Right now my code is NOT using any math or sci modules (which I am sure will be the way to go to smooth this process out) but for now I am manually calculating the values just to better understand python.


Expand|Select|Wrap|Line Numbers
  1. import time
  2.  
  3. unique = open("d:/temp/tmp/ID.txt", 'r')
  4. writer = open("d:/temp/tmp/ID_out.txt", 'w')
  5. t1 = time.clock()
  6. avg_val=float()
  7. b = int(0)
  8. c = int(0)
  9. i = int(0)
  10. j = int(0)
  11. Arr = []
  12. SortArr = []
  13. k = int(0)
  14. l = int(0)
  15. LoopArr = []
  16. LoopCntr = int(0)
  17. for ID in unique:
  18.     ID = ID.strip()
  19.     LoopArr.append(ID)    
  20. unique.close() 
  21.  
  22. for m in LoopArr:
  23.     reader = open("d:/temp/tmp/sample.txt",'r')
  24.  
  25.     while True:        
  26.         line = reader.readline()
  27.  
  28.         if len(line) != 0:
  29.             line = line.strip()
  30.  
  31.             TmpArr = line.split(',')
  32.  
  33.             if m == TmpArr[0]:
  34.                 StrTmp=str(TmpArr[0])+','+str(TmpArr[1])+','+str('1')
  35.                 SortArr.append(StrTmp.split(','))
  36.                 i = i + 1 
  37.         else:                   
  38.             break            
  39.  
  40.     reader.close()
  41.     SortArr.sort(key=lambda a: a[1])
  42.     median_val=int(0)
  43.     if i == 0:
  44.         median_val=SortArr[i][1]
  45.         avg_val=SortArr[i][1]
  46.  
  47.     elif i == 1:
  48.  
  49.         for a in SortArr:
  50.             c = c + int(SortArr[b][1])
  51.             b = b + 1
  52.         median_val = c/i
  53.         avg_val = c/i
  54.  
  55.     elif i == 2:
  56.         median_val=SortArr[i-1][1]
  57.         for a in SortArr:
  58.             c = c + int(SortArr[b][1])
  59.             b = b + 1
  60.  
  61.         avg_val = c/i
  62.  
  63.     elif i%2==0:
  64.         median_val=(int(SortArr[i/2][1])+int(SortArr[(i/2)+1][1]))/2
  65.         for a in SortArr:
  66.             c = c + int(SortArr[b][1])
  67.             b = b + 1
  68.  
  69.         avg_val = c/i        
  70.     else:
  71.         median_val=int(SortArr[i/2][1])
  72.         for a in SortArr:
  73.             c = c + int(SortArr[b][1])
  74.             b = b + 1
  75.  
  76.         avg_val = c/i
  77.     ID = m
  78.     SortArr = []
  79.     TempString = str(ID) + ',' + str(avg_val) + ',' + str(median_val) + ',' + str(i) + '\n'
  80.     writer.write(TempString)
  81.     i = int(0)    
  82.     b = 0
  83.     avg_val = 0
  84.     median_val = 0
  85.     c = 0
  86. writer.close()
  87. t2 = time.clock() - t1
  88. print t2
Sorry my code is not commented very well at the moment, basically because I keep trying new things!
here is a link for the basic "word count" using python and hadoop
http://www.michael-noll.com/wiki/Wri...gram_In_Python
its really the only CLEAR example that i've been able to find where someone is using python and hadoop, although I know there are alot of folks out there using it too!
Thanks a bunch for helping me in any way shape or form!
-Eric
Apr 22 '10 #4
Glenton
391 Expert 256MB
Okay, so it's not entirely clear what the problem is, but it doesn't look like it will run too efficiently if at all at the moment!

The first question is how many unique IDs are there?

If there's a reasonable number then an approach would be to do the following:
1. set up an array with a sorted list of the IDs
2. go through the large data set a line at a time ("for line in file:" is the normal way of doing this).
3. using each line find out the ID, use a bisect sort to find the appropriate ID, and updating each of the calculations for what you want to do.
4. save every so often to a file

But if there's an unreasonable number of IDs then we'd have to reconsider...

Let us know.
Apr 22 '10 #5
erbrose
58
Thanks again for your reply!
Well i did some more research yesterday and it appears that with hadoop Map Reduce (via Streaming) you can set the Reduce to NONE, so that you only have to execute one script. So on that note... I am playing around with a sample data set of 39 Million rows. My sample contains ~20k unique ID's, in the real Terabytes worth of data there were will roughly 27 Million unique ID. Yesterday I was able to do a simple process that looped through the sample and put it into an array, sorted that array by the ID and VAL, then printed the sorted array out to a file via this.

Expand|Select|Wrap|Line Numbers
  1. TmpArr = []
  2. for line in sys.stdin:
  3.     line = line.strip()
  4.     TmpArr.append(line.split(','))
  5. TmpArr.sort(key=lambda a:(a[0],int(a[1])))
  6. for row_ in TmpArr:
  7.     print (row_)
Thats all good for my first hadoop streaming job! I then got the idea to try and create two files (a sorted list and a uniuqe dict of ID's.)

Expand|Select|Wrap|Line Numbers
  1. TmpArr = []
  2. Unique = {}
  3. for line in sys.stdin:
  4.     line = line.strip()
  5.     TmpArr.append(line.split(','))
  6.     ID,VAL = line.split(',',1)
  7.     Unique[ID] = Unique.get(ID,0)
  8. TmpArr.sort(key=lambda a:(a[0],int(a[1])))
  9. for row_ in TmpArr:
  10.     print (row_)
  11.  
Again this worked just great. My next thought was to create two loops, the main looping through my Unique Dict, then for each row loop through my TmpArr and do all my math, but for some reason... hadoop doesn't like that, kept getting a time-out error.

Expand|Select|Wrap|Line Numbers
  1. TmpArr = []
  2. Unique = {}
  3. for line in sys.stdin:
  4.     line = line.strip()
  5.     TmpArr.append(line.split(','))
  6.     ID,VAL = line.split(',',1)
  7.     Unique[ID] = Unique.get(ID,0)
  8. TmpArr.sort(key=lambda a:(a[0],int(a[1])))
  9.  
  10. for m in Unique:
  11.     try:
  12.         for n in TmpArr:
  13.             #do a bunch of math
  14.     except ValueError:
  15.         pass
  16. for row_ in TmpArr:
  17.     print (row_)
  18.  
today I am going to just try and loop through my sorted TmpArr and do some simple if statements to say something like if TmpArr[i]==TmpArr[i+1] then output that value to another Temp Array. Will let you know (hopefully by the end of the day) how this process goes.
Apr 23 '10 #6
erbrose
58
Hey all!
Well, i am now able to run this code on my sample csv file (3 million rows) on my desktop. It completes in under 1 minute. Which is great! Still having issues on the hadoop end, but i think that problem is not for this forum. Would still appreciate any suggestions or improvements on the code itself as im still very much a newbie!

Expand|Select|Wrap|Line Numbers
  1. import time
  2. t1 = time.clock()
  3. TmpArr = []
  4. Unique = []
  5. SortArr = []
  6. OutArr = []
  7. i = int(0)
  8. l = int(0)
  9. b = int(0)
  10. c = int(0)
  11. n = int(0)
  12. reader = open("d:/temp/tmp/Input.csv",'r')
  13. writer = open("d:/temp/tmp/Out.csv",'w')
  14.  
  15.  
  16. for line in reader:
  17.     line = line.strip()
  18.     TmpArr.append(line.split(','))
  19. TmpArr.sort(key=lambda a:(a[0],int(a[1])))
  20.  
  21. reader.close()
  22.  
  23. for m in TmpArr:
  24.  
  25.     StrTemp = str(TmpArr[n][0]) + ',' + str(TmpArr[n][1])
  26.     SortArr.append(StrTemp.split(','))    
  27.     #if first row go ahead and put into unique array
  28.     if i == 0:        
  29.         Unique.append(StrTemp.split(','))        
  30.     else:
  31.         if SortArr[i][0]==SortArr[i-1][0]:
  32.             StrTemp = str(TmpArr[i][0]) + ',' + str(TmpArr[i][1])
  33.             Unique.append(StrTemp.split(','))
  34.         else:
  35.             try:
  36.                 l = len(Unique)
  37.                 if l == 0:
  38.                     median_val=Unique[l][1]
  39.                     avg_val=Unique[l][1]
  40.                 elif l == 1:
  41.                     for a in Unique:
  42.                         c = c + int(Unique[b][1])
  43.                         b = b + 1
  44.                     median_val = c/l
  45.                     avg_val = c/l
  46.                 elif l == 2:
  47.                     median_val=Unique[l-1][1]
  48.                     for a in Unique:
  49.                         c = c + int(Unique[b][1])
  50.                         b = b + 1
  51.  
  52.                     avg_val = c/l
  53.  
  54.                 elif l%2==0:
  55.                     median_val=(int(Unique[l/2][1])+int(Unique[(l/2)+1][1]))/2
  56.                     for a in Unique:
  57.                         c = c + int(Unique[b][1])
  58.                         b = b + 1
  59.  
  60.                     avg_val = c/l
  61.                 else:
  62.                     median_val=int(Unique[l/2][1])
  63.                     for a in Unique:
  64.                         c = c + int(Unique[b][1])
  65.                         b = b + 1
  66.  
  67.                     avg_val = c/l
  68.  
  69.                 ID = Unique[0][0]
  70.                 TempString = str(ID) + ',' + str(avg_val) + ',' + str(median_val) + ',' + str(l) + '\n'
  71.                 writer.write(TempString)
  72.                 Unique = []
  73.                 SortArr = []
  74.                 SortArr.append(StrTemp.split(','))
  75.                 Unique.append(StrTemp.split(','))
  76.                 OutArr.append(TempString.split(','))
  77.                 i = 0
  78.                 b = int(0)
  79.                 avg_val = int(0)
  80.                 median_val = int(0)
  81.                 c = int(0)    
  82.             except ValueError:
  83.                 pass
  84.  
  85.  
  86.  
  87.     i = i + 1
  88.     n = n + 1        
  89.  
  90.  
  91. t2 = time.clock() - t1
  92. writer.write(str(t2))
  93. writer.close()
Apr 26 '10 #7
bvdet
2,851 Expert Mod 2GB
Good work erbrose! I have a few minor comments.

There is no need in initializing variables as integer. i = 0 and i = int(0) will give you identical results.

It looks like you are using variable n as an index to your list TmpArr. You don't need it. By iterating on the list, you can do this:
Expand|Select|Wrap|Line Numbers
  1. for item in TmpArr:
  2.     StrTemp = str(item[0]) + ',' + str(item[1])
I like to use item for the variable in that situation. If you need an index for another reason:
Expand|Select|Wrap|Line Numbers
  1. for j, item in enumerate(TmpArr):
Augmented assignment:
Expand|Select|Wrap|Line Numbers
  1.     i += 1
Keep up the good work!
Apr 26 '10 #8
erbrose
58
one last post (on this subject anyhow!) just wanted to let you know that I was able to complete my first Map/Reduce job on Hadoop with Python! Thanks again for all your help!
Apr 26 '10 #9

Sign in to post your reply or Sign up for a free account.

Similar topics

1
by: Emile van Sebille | last post by:
QOTW: "If we get 2.3.3c1 out in early December, we could release 2.3.3 final before the end of the year, and start 2004 with a 100% bug-free codebase <wink>." -- Tim Peters "cjOr proWe vbCould...
0
by: Emile van Sebille | last post by:
QOTW: "Have you ever used the copy module? I am *not* a beginner, and have used it *once* (and I can't remember what for, either)." -- Michael Hudson "It will likely take a little practice...
0
by: Emile van Sebille | last post by:
QOTW (in the OS agnostic category): "There is a (very popular) Python package out there which exposes the win32 api. I'm not sure what it's called. (win32api? pythonwin? win32all?)" -- Francis...
3
by: sapsi | last post by:
Hello, I'm not sure if this the correct list but here goes (and sorry for the noise). I've been attempting to wrap python around libhdfs. So far so good (i've attached the SWIG template at the...
14
by: Aaron Watters | last post by:
So, in between skiing runs I noticed a Business Week cover story on "cloud computing". The article had lots of interesting information in it like about how somebody's mom used to be an airline...
0
by: taylorcarr | last post by:
A Canon printer is a smart device known for being advanced, efficient, and reliable. It is designed for home, office, and hybrid workspace use and can also be used for a variety of purposes. However,...
0
by: Charles Arthur | last post by:
How do i turn on java script on a villaon, callus and itel keypad mobile phone
0
by: aa123db | last post by:
Variable and constants Use var or let for variables and const fror constants. Var foo ='bar'; Let foo ='bar';const baz ='bar'; Functions function $name$ ($parameters$) { } ...
0
by: ryjfgjl | last post by:
If we have dozens or hundreds of excel to import into the database, if we use the excel import function provided by database editors such as navicat, it will be extremely tedious and time-consuming...
0
by: ryjfgjl | last post by:
In our work, we often receive Excel tables with data in the same format. If we want to analyze these data, it can be difficult to analyze them because the data is spread across multiple Excel files...
0
BarryA
by: BarryA | last post by:
What are the essential steps and strategies outlined in the Data Structures and Algorithms (DSA) roadmap for aspiring data scientists? How can individuals effectively utilize this roadmap to progress...
1
by: nemocccc | last post by:
hello, everyone, I want to develop a software for my android phone for daily needs, any suggestions?
0
marktang
by: marktang | last post by:
ONU (Optical Network Unit) is one of the key components for providing high-speed Internet services. Its primary function is to act as an endpoint device located at the user's premises. However,...
0
jinu1996
by: jinu1996 | last post by:
In today's digital age, having a compelling online presence is paramount for businesses aiming to thrive in a competitive landscape. At the heart of this digital strategy lies an intricately woven...

By using Bytes.com and it's services, you agree to our Privacy Policy and Terms of Use.

To disable or enable advertisements and analytics tracking please visit the manage ads & tracking page.