Hey all.
(hopefully) a quick question here. I am processing data using Hadoop Streaming Map/Reduce.. the map.py is straight forward.. basically takes the input data (in the form of sys.stdin), loads it into a list, sorts that list, then... well not exactly sure what hadoop does with that, but pretty sure it creates a temporarly file much like a csv in memory - for line in sys.stdin:
-
<append into a list then sort>
-
for m in TmpArr:
-
print m
I then have a reduce.py that takes each line from the mysterious hadoop temp file and loads it as sys.stdin... like this... - for line in sys.stdin:
-
<load into temporary list and do some stuff>
my issue is as soon as line = nothing, the process ends... even if there is still data to process. Is there an error checking way to fix this with stdin?
an example would be this... my table looks like this
ID---VAL
01--20
01--22
01--25
02--10
02--15
02--17
03--5
03--7
my output SHOULD look like this
ID---AVG---COUNT
01--22.3--3
02--14.0--3
03--6.0--2
but its coming out like this
ID---AVG---COUNT
01--22.3--3
02--14.0--3
Sorry this is so long winded and thanks for any input. Also, i could post my whole code if needed but its a bit long winded too!
Cheers,
Eric
Er...I don't really understand what the question is now. What's the pseudocode for what you're trying to do?
The trickiness of what you're doing is not the calculation, but handling the large dataset, right? If you had a way of converting your keys uniquely into 0,...,n-1 you might find it easier to create an array/list which you just update on the fly.
4 1582
i am messing around with just running the reducer.py with a txt file and am able to process the whole file by adding this - while True:
-
line = reader.readline()
-
if len(line) != 0:
-
<my code here>
-
else:
-
<repeat my code here>
-
seems slightly wrong to have to repeat all my code in the if and the else but it works... am not able to get it to work using sys.stdin...
Thanks again
Okay, that's interesting. In your second example, I assume reader is an object of a text file (ie reader=open("whatever.txt") or something?).
If so, it should work to go: - for line in reader:
-
<your code here>
I'm assuming that when the length of the line is zero, you just want to carry on to the next line? So you could do this like this: - for line in reader:
-
if len(line)==0: continue
-
<your code here>
I definitely can't see any reason to repeat your code!
Regarding the sys.stdin, I suppose it goes until it hits a blank line and then the iterator stops running. I've not used this kind of thing, but can imagine that would be problematic. I would suppose that the easier fix is in your reduce.py. Perhaps you can ensure that it goes through the whole hadoop file there before the iterator ends.
Perhaps you could post reduce.py? I'm guessing that it could be written neatly as a class with an iterator. And that it's not written like that now ;P
Thanks!
Alright.. well the only reason I check for line == 0 (or in this case line = "") is the actual end of file.. there will be no NULL lines from the map input. I am still having to pretty much duplicate the code as you see. The code is all over the place too as im still in debug mode... but it is working properly with a csv file as the input.. I am calculating the average, standard deviation, median, min and max values too. Will eventually look into Numpy or Scipy, but for now calculating values the old fashion way -
#!/usr/bin/python
-
import sys
-
import math
-
TmpArr = []
-
Unique = []
-
SortArr = []
-
OutArr = []
-
tmp_avgspd = float(0)
-
sqr_sum = float(0)
-
i = int(0)
-
l = int(0)
-
b = int(0)
-
c = int(0)
-
n = int(0)
-
j = int(0)
-
k = float(0)
-
a = int(0)
-
devsum = float(0)
-
deviation = []
-
reader = open("d:/temp/tmp/sample.csv",'r')
-
-
while True:
-
line = reader.readline()
-
-
line = line.strip()
-
StrTemp = line
-
TmpArr.append(line.split(','))
-
SortArr.append(line.split(','))
-
#if last line.. finish processing stuff in my Unique List
-
if line == "":
-
l = len(Unique)
-
if l == 0:
-
median_val=Unique[l][1]
-
avg_val=Unique[l][1]
-
min_val = Unique[l][1]
-
std_dev=0.0
-
elif l == 1:
-
median_val=Unique[l-1][1]
-
avg_val=Unique[l-1][1]
-
std_dev=0.0
-
min_val = Unique[l-1][1]
-
max_val = Unique[l-1][1]
-
elif l == 2:
-
for a in Unique:
-
c = c + int(Unique[b][1])
-
b = b + 1
-
avg_val = c/l
-
median_val = c/l
-
tmp_avgspd = float(c)/float(l)
-
b = 0
-
c = 0
-
for a in Unique:
-
deviation.append(float((float(Unique[b][1])-tmp_avgspd)*(float(Unique[b][1])-tmp_avgspd)))
-
b = b + 1
-
b = 0
-
for a in deviation:
-
devsum = devsum + float(deviation[b])
-
b = b + 1
-
devsum = devsum/1.0
-
sqr_sum = math.sqrt(devsum)
-
std_dev = round(sqr_sum,3)
-
min_val = Unique[0][1]
-
max_val = Unique[l-1][1]
-
elif l%2==0:
-
median_val=(int(Unique[l/2][1])+int(Unique[(l/2)+1][1]))/2
-
for a in Unique:
-
c = c + int(Unique[b][1])
-
b = b + 1
-
avg_val = c/l
-
tmp_avgspd = float(c)/float(l)
-
b = 0
-
c = 0
-
for a in Unique:
-
deviation.append(float((float(Unique[b][1])-tmp_avgspd)*(float(Unique[b][1])-tmp_avgspd)))
-
b = b + 1
-
b = 0
-
devsum = 0.0
-
for a in deviation:
-
devsum = devsum + float(deviation[b])
-
b = b + 1
-
devsum2 = devsum
-
k = l - 1
-
devsum = devsum/k
-
sqr_sum = math.sqrt(devsum)
-
std_dev = round(sqr_sum,3)
-
min_val = Unique[0][1]
-
max_val = Unique[l-1][1]
-
else:
-
median_val=Unique[l/2][1]
-
for a in Unique:
-
d = Unique[b][1]
-
d = int(d)
-
c = c + d
-
b = b + 1
-
avg_val = c/l
-
tmp_avgspd = float(c)/float(l)
-
b = 0
-
c = 0
-
for a in Unique:
-
deviation.append(float((float(Unique[b][1])-tmp_avgspd)*(float(Unique[b][1])-tmp_avgspd)))
-
b = b + 1
-
b = 0
-
devsum = 0.0
-
for a in deviation:
-
devsum = devsum + float(deviation[b])
-
b = b + 1
-
devsum2 = devsum
-
k = l - 1
-
devsum = devsum/k
-
sqr_sum = math.sqrt(devsum)
-
std_dev = round(sqr_sum,3)
-
min_val = Unique[0][1]
-
max_val = Unique[l-1][1]
-
-
id = Unique[0][0]
-
TempString = str(id) + ',' + str(avg_val) + ',' + str(median_val) + ',' + str(std_dev) + ',' + str(min_val) + ',' + str(max_val) + ',' + str(l)
-
print TempString
-
break
-
else:
-
-
#if first row go ahead and put into unique array
-
if i == 0:
-
Unique.append(line.split(','))
-
else:
-
#print Unique
-
-
if SortArr[i][0]==SortArr[i-1][0]:
-
#StrTemp = str(TmpArr[j][0]) + ',' + str(TmpArr[j][1])
-
StrTemp = str(TmpArr[j][0]) + ',' + str(TmpArr[j][1])
-
Unique.append(StrTemp.split(','))
-
else:
-
-
l = len(Unique)
-
if l == 0:
-
median_val=Unique[l][1]
-
avg_val=Unique[l][1]
-
min_val = Unique[l][1]
-
max_val = Unique[l][1]
-
std_dev=0.0
-
elif l == 1:
-
median_val=Unique[l-1][1]
-
avg_val=Unique[l-1][1]
-
std_dev=0.0
-
min_val = Unique[l-1][1]
-
max_val = Unique[l-1][1]
-
elif l == 2:
-
for a in Unique:
-
c = c + int(Unique[b][1])
-
b = b + 1
-
avg_val = c/l
-
median_val = c/l
-
tmp_avgspd = float(c)/float(l)
-
b = 0
-
c = 0
-
for a in Unique:
-
deviation.append(float((float(Unique[b][1])-tmp_avgspd)*(float(Unique[b][1])-tmp_avgspd)))
-
b = b + 1
-
b = 0
-
for a in deviation:
-
devsum = devsum + float(deviation[b])
-
b = b + 1
-
devsum = devsum/1.0
-
sqr_sum = math.sqrt(devsum)
-
std_dev = round(sqr_sum,3)
-
min_val = Unique[0][1]
-
max_val = Unique[l-1][1]
-
elif l%2==0:
-
median_val=(int(Unique[l/2][1])+int(Unique[(l/2)+1][1]))/2
-
for a in Unique:
-
c = c + int(Unique[b][1])
-
b = b + 1
-
avg_val = c/l
-
tmp_avgspd = float(c)/float(l)
-
b = 0
-
c = 0
-
for a in Unique:
-
deviation.append(float((float(Unique[b][1])-tmp_avgspd)*(float(Unique[b][1])-tmp_avgspd)))
-
b = b + 1
-
b = 0
-
devsum = 0.0
-
for a in deviation:
-
devsum = devsum + float(deviation[b])
-
b = b + 1
-
devsum2 = devsum
-
k = l - 1
-
devsum = devsum/k
-
sqr_sum = math.sqrt(devsum)
-
std_dev = round(sqr_sum,3)
-
min_val = Unique[0][1]
-
max_val = Unique[l-1][1]
-
else:
-
median_val=Unique[l/2][1]
-
for a in Unique:
-
d = Unique[b][1]
-
d = int(d)
-
c = c + d
-
b = b + 1
-
avg_val = c/l
-
tmp_avgspd = float(c)/float(l)
-
b = 0
-
c = 0
-
for a in Unique:
-
deviation.append(float((float(Unique[b][1])-tmp_avgspd)*(float(Unique[b][1])-tmp_avgspd)))
-
b = b + 1
-
b = 0
-
devsum = 0.0
-
for a in deviation:
-
devsum = devsum + float(deviation[b])
-
b = b + 1
-
devsum2 = devsum
-
k = l - 1
-
devsum = devsum/k
-
sqr_sum = math.sqrt(devsum)
-
std_dev = round(sqr_sum,3)
-
min_val = Unique[0][1]
-
max_val = Unique[l-1][1]
-
-
id = Unique[0][0]
-
TempString = str(id) + ',' + str(avg_val) + ',' + str(median_val) + ',' + str(std_dev) + ',' + str(min_val) + ',' + str(max_val) + ',' + str(l)
-
print TempString
-
deviation = []
-
a = len(TmpArr)
-
-
Unique = []
-
SortArr = []
-
SortArr.append(StrTemp.split(','))
-
Unique.append(StrTemp.split(','))
-
i = 0
-
b = int(0)
-
avg_val = int(0)
-
median_val = int(0)
-
c = int(0)
-
-
-
i = i + 1
-
n = n + 1
-
j = j + 1
-
Er...I don't really understand what the question is now. What's the pseudocode for what you're trying to do?
The trickiness of what you're doing is not the calculation, but handling the large dataset, right? If you had a way of converting your keys uniquely into 0,...,n-1 you might find it easier to create an array/list which you just update on the fly.
Sign in to post your reply or Sign up for a free account.
Similar topics
by: Willem Ligtenberg |
last post by:
I decided to use SAX to parse my xml file.
But the parser crashes on:
File "/usr/lib/python2.3/site-packages/_xmlplus/sax/handler.py", line 38, in fatalError
raise exception...
|
by: Cigdem |
last post by:
Hello,
I am trying to parse the XML files that the user selects(XML files are
on anoher OS400 system called "wkdis3"). But i am permenantly getting
that error:
Directory0: \\wkdis3\ROOT\home...
|
by: Pir8 |
last post by:
I have a complex xml file, which contains stories within a magazine. The
structure of the xml file is as follows:
<?xml version="1.0" encoding="ISO-8859-1" ?>
<magazine>
<story>...
|
by: Christoph Bisping |
last post by:
Hello!
Maybe someone is able to give me a little hint on this:
I've written a vb.net app which is mainly an interpreter for specialized
CAD/CAM files.
These files mainly contain simple movement...
|
by: Rick Walsh |
last post by:
I have an HTML table in the following format:
<table>
<tr><td>Header 1</td><td>Header 2</td></tr>
<tr><td>1</td><td>2</td></tr>
<tr><td>3</td><td>4</td></tr>
<tr><td>5</td><td>6</td></tr>...
|
by: toton |
last post by:
Hi,
I have some ascii files, which are having some formatted text. I want
to read some section only from the total file.
For that what I am doing is indexing the sections (denoted by .START
in...
|
by: Paulers |
last post by:
Hello,
I have a log file that contains many multi-line messages. What is the
best approach to take for extracting data out of each message and
populating object properties to be stored in an...
|
by: Chris Carlen |
last post by:
Hi:
Having completed enough serial driver code for a TMS320F2812
microcontroller to talk to a terminal, I am now trying different
approaches to command interpretation.
I have a very simple...
|
by: charliefortune |
last post by:
I am fetching some product feeds with PHP like this
$merch = substr($key,1);
$feed = file_get_contents($_POST);
$fp = fopen("./feeds/feed".$merch.".txt","w+");
fwrite ($fp,$feed);
fclose...
|
by: Felipe De Bene |
last post by:
I'm having problems parsing an HTML file with the following syntax :
<TABLE cellspacing=0 cellpadding=0 ALIGN=CENTER BORDER=1 width='100%'>
<TH BGCOLOR='#c0c0c0' Width='3%'>User ID</TH>
<TH...
|
by: DolphinDB |
last post by:
Tired of spending countless mintues downsampling your data? Look no further!
In this article, you’ll learn how to efficiently downsample 6.48 billion high-frequency records to 61 million...
|
by: isladogs |
last post by:
The next Access Europe meeting will be on Wednesday 6 Mar 2024 starting at 18:00 UK time (6PM UTC) and finishing at about 19:15 (7.15PM).
In this month's session, we are pleased to welcome back...
|
by: jfyes |
last post by:
As a hardware engineer, after seeing that CEIWEI recently released a new tool for Modbus RTU Over TCP/UDP filtering and monitoring, I actively went to its official website to take a look. It turned...
|
by: ArrayDB |
last post by:
The error message I've encountered is; ERROR:root:Error generating model response: exception: access violation writing 0x0000000000005140, which seems to be indicative of an access violation...
|
by: PapaRatzi |
last post by:
Hello,
I am teaching myself MS Access forms design and Visual Basic. I've created a table to capture a list of Top 30 singles and forms to capture new entries. The final step is a form (unbound)...
|
by: CloudSolutions |
last post by:
Introduction:
For many beginners and individual users, requiring a credit card and email registration may pose a barrier when starting to use cloud servers. However, some cloud server providers now...
|
by: af34tf |
last post by:
Hi Guys, I have a domain whose name is BytesLimited.com, and I want to sell it. Does anyone know about platforms that allow me to list my domain in auction for free. Thank you
|
by: Faith0G |
last post by:
I am starting a new it consulting business and it's been a while since I setup a new website. Is wordpress still the best web based software for hosting a 5 page website? The webpages will be...
|
by: isladogs |
last post by:
The next Access Europe User Group meeting will be on Wednesday 3 Apr 2024 starting at 18:00 UK time (6PM UTC+1) and finishing by 19:30 (7.30PM).
In this session, we are pleased to welcome former...
| |