By using this site, you agree to our updated Privacy Policy and our Terms of Use. Manage your Cookies Settings.
435,165 Members | 850 Online
Bytes IT Community
+ Ask a Question
Need help? Post your question and get tips & solutions from a community of 435,165 IT Pros & Developers. It's quick & easy.

Thread Question

P: n/a
Hi,

I have some basic doubts about thread.

I have a list which has items in it which need to be downloaded from
the internet.
Let's say list is:

list_items[] which has 100 items in it.

I have a function download_from_web() which does the work of
downloading the items from the web. It does error handling in case it
gets errors like 404 et cetera.

Currently, this is how I'm using it.

for item in list_items:
download_from_web(item)

This way, one items is downloaded at a time.

I'm planning to implement threads in my application so that multiple
items can be downloaded concurrently. I want the thread option to be
user-defined.

Looking at the documentation of threads (Core Python Programming), I've
noticed that all threads are executed a once. Depending upon what they
are doing, some finish early and some later.

But I want to implement something like:

for item in list_items:
for num in thread_args:
thread[num].start()
thread[num].start()

Is this the correct way of threading applications ?
This is the first time I'd be doing threading. So was looking for
comments and suggestions.

Thanks,
Ritesh

Jul 27 '06 #1
Share this Question
Share on Google+
27 Replies


P: n/a
Ritesh Raj Sarraf wrote:
I'm planning to implement threads in my application so that multiple
items can be downloaded concurrently. I want the thread option to be
user-defined.

Looking at the documentation of threads (Core Python Programming), I've
noticed that all threads are executed a once. Depending upon what they
are doing, some finish early and some later.

But I want to implement something like:

for item in list_items:
for num in thread_args:
thread[num].start()
thread[num].start()

Is this the correct way of threading applications ?
This is the first time I'd be doing threading. So was looking for
comments and suggestions.
What you want is to use a pool of threads so that you can configure how
many requests are issued at a time (you don't want to try to issue 100
requests all in parallel). You can communicate with the threads through a
Queue.

So if the code for a thread looks like:

def run(request, response):
while 1:
item = request.get()
if item is None:
break
response.put(download_from_web(item))

# your main loop can be something like:

requestQueue = Queue()
responseQueue = Queue()
thread_pool = [
Thread(target=run, args=(requestQueue, responseQueue)
for i in range(numthreads)]
for t in thread_pool: t.start()

for item in list_items:
requestQueue.put(item)

for i in range(len(list_items)):
response = responseQueue.get()
handle_response(response)

# and then to shut down the threads when you've finished:
for t in thread_pool:
requestQueue.put(None)
for t in thread_pool:
t.join()

Jul 27 '06 #2

P: n/a
Duncan,

I couldn't make out much from the code.
Instead this is what I did.

threads = []
nloops = range(len(lRawData))
for i in nloops:
(sUrl, sFile, download_size, checksum) =
stripper(lRawData[i])
t = threading.Thread(target=download_from_web, args=(sUrl,
sFile, sSourceDir, None))
# = pypt_thread(download_from_web, i,
stripper(lRawData[i]))
threads.append(t)

i = 0
join_i = 0
while i < nloops:
counter = 0
while counter < 3:
threads[i].start()
counter += 1
i += 1
counter = 0
join_i = i - 3
while counter < 3:
threads[join_i].join()
counter += 1
join_i += 1

Is this correct ? Comments!!

Ritesh
Duncan Booth wrote:
Ritesh Raj Sarraf wrote:
I'm planning to implement threads in my application so that multiple
items can be downloaded concurrently. I want the thread option to be
user-defined.

Looking at the documentation of threads (Core Python Programming), I've
noticed that all threads are executed a once. Depending upon what they
are doing, some finish early and some later.

But I want to implement something like:

for item in list_items:
for num in thread_args:
thread[num].start()
thread[num].start()

Is this the correct way of threading applications ?
This is the first time I'd be doing threading. So was looking for
comments and suggestions.

What you want is to use a pool of threads so that you can configure how
many requests are issued at a time (you don't want to try to issue 100
requests all in parallel). You can communicate with the threads through a
Queue.

So if the code for a thread looks like:

def run(request, response):
while 1:
item = request.get()
if item is None:
break
response.put(download_from_web(item))

# your main loop can be something like:

requestQueue = Queue()
responseQueue = Queue()
thread_pool = [
Thread(target=run, args=(requestQueue, responseQueue)
for i in range(numthreads)]
for t in thread_pool: t.start()

for item in list_items:
requestQueue.put(item)

for i in range(len(list_items)):
response = responseQueue.get()
handle_response(response)

# and then to shut down the threads when you've finished:
for t in thread_pool:
requestQueue.put(None)
for t in thread_pool:
t.join()
Jul 27 '06 #3

P: n/a
Ritesh Raj Sarraf wrote:
Duncan,

I couldn't make out much from the code.
Please, try again to understand Duncan's code. It's much better than
what you did.
Instead this is what I did.

threads = []
nloops = range(len(lRawData))
for i in nloops:
(sUrl, sFile, download_size, checksum) =
stripper(lRawData[i])
t = threading.Thread(target=download_from_web, args=(sUrl,
sFile, sSourceDir, None))
# = pypt_thread(download_from_web, i,
stripper(lRawData[i]))
threads.append(t)

i = 0
join_i = 0
while i < nloops:
counter = 0
while counter < 3:
threads[i].start()
counter += 1
i += 1
counter = 0
join_i = i - 3
while counter < 3:
threads[join_i].join()
counter += 1
join_i += 1

Is this correct ? Comments!!
This is just painful. It's not exactly "incorrect", but I think (I
*think*, it's hard to tell even after reading it 3 times) that it's
going to only download three requests at a time and if one (or more) of
the requests takes a long time it will hold up all the others. Also,
you're creating one thread per item in lRawData even though you only
need/use three at a time.

if you set numthreads = 3 in Duncan's code, you would only be
downloading 3 things at a time, but you'd only be using three threads
and long running requests would not affect the others.

If you need help understanding it please ask questions. I, for one,
would be happy to comment it for you to explain how it works. It's so
nice and elegant that I've already cut-and-pasted it into my own
"notebook" of cool useful python "patterns" to use in the future.

Reread it slowly, think about what it's doing, if questions arise write
them down and ask them. Duncan's code is beautiful. It's well worth
your time to understand it.

Peace,
~Simon
>
Ritesh
Duncan Booth wrote:
Ritesh Raj Sarraf wrote:
I'm planning to implement threads in my application so that multiple
items can be downloaded concurrently. I want the thread option to be
user-defined.
>
Looking at the documentation of threads (Core Python Programming), I've
noticed that all threads are executed a once. Depending upon what they
are doing, some finish early and some later.
>
But I want to implement something like:
>
for item in list_items:
for num in thread_args:
thread[num].start()
thread[num].start()
>
Is this the correct way of threading applications ?
This is the first time I'd be doing threading. So was looking for
comments and suggestions.
>
What you want is to use a pool of threads so that you can configure how
many requests are issued at a time (you don't want to try to issue 100
requests all in parallel). You can communicate with the threads through a
Queue.

So if the code for a thread looks like:

def run(request, response):
while 1:
item = request.get()
if item is None:
break
response.put(download_from_web(item))

# your main loop can be something like:

requestQueue = Queue()
responseQueue = Queue()
thread_pool = [
Thread(target=run, args=(requestQueue, responseQueue)
for i in range(numthreads)]
for t in thread_pool: t.start()

for item in list_items:
requestQueue.put(item)

for i in range(len(list_items)):
response = responseQueue.get()
handle_response(response)

# and then to shut down the threads when you've finished:
for t in thread_pool:
requestQueue.put(None)
for t in thread_pool:
t.join()
Jul 27 '06 #4

P: n/a
Ritesh Raj Sarraf wrote:
Duncan,

I couldn't make out much from the code.
Instead this is what I did.

threads = []
nloops = range(len(lRawData))
for i in nloops:
(sUrl, sFile, download_size, checksum) =
stripper(lRawData[i])
t = threading.Thread(target=download_from_web, args=(sUrl,
sFile, sSourceDir, None))
# = pypt_thread(download_from_web, i,
stripper(lRawData[i]))
threads.append(t)

i = 0
join_i = 0
while i < nloops:
counter = 0
while counter < 3:
threads[i].start()
counter += 1
i += 1
counter = 0
join_i = i - 3
while counter < 3:
threads[join_i].join()
counter += 1
join_i += 1

Is this correct ? Comments!!
This is bad because you start one thread for each URL then let them run in
batches of 3. So you get all the overhead of creating lots of threads and
very little of the benefit. Much better just to create 3 threads and let
each one handle as many of the requests as it can.
Jul 27 '06 #5

P: n/a
Simon Forman wrote:
If you need help understanding it please ask questions. I, for one,
would be happy to comment it for you to explain how it works. It's so
nice and elegant that I've already cut-and-pasted it into my own
"notebook" of cool useful python "patterns" to use in the future.

Reread it slowly, think about what it's doing, if questions arise write
them down and ask them. Duncan's code is beautiful. It's well worth
your time to understand it.
If you convert what I wrote into an example which actually runs then do
repost it. It might also be easier to understand since I'm sure I'll have
made some mistakes or omitted some critical bits (such as the import
lines).
Jul 27 '06 #6

P: n/a
Ritesh Raj Sarraf wrote:
[snip]
for item in list_items:
download_from_web(item)

This way, one items is downloaded at a time.

I'm planning to implement threads in my application so that multiple
items can be downloaded concurrently. I want the thread option to be
user-defined.
[snip]

See my post about the iterthreader module I wrote...
http://groups.google.com/group/comp....29fae28cf44c1/

for url, result in Threader(download_from_web, list_items):
print url, result
#...

--
- Justin

Jul 27 '06 #7

P: n/a
Duncan Booth wrote:
Simon Forman wrote:
If you need help understanding it please ask questions. I, for one,
would be happy to comment it for you to explain how it works. It's so
nice and elegant that I've already cut-and-pasted it into my own
"notebook" of cool useful python "patterns" to use in the future.

Reread it slowly, think about what it's doing, if questions arise write
them down and ask them. Duncan's code is beautiful. It's well worth
your time to understand it.

If you convert what I wrote into an example which actually runs then do
repost it. It might also be easier to understand since I'm sure I'll have
made some mistakes or omitted some critical bits (such as the import
lines).
A pleasure.

There was one missing close-parenthesis in the Thread() call, but other
than that (and the implied imports and variables) everything was good.

I added some dummy code to simulate long-running requests, and I made
the run() function return its Thread's name, but other than that it's
basically the code you posted. It's neat to increase NUMTHREADS and
watch the script run faster. :)

When the time comes to use this in something I'll probably wrap it up
in a class, and add some error handling to the run() function (or
method). But in the meantime it's quite elegant and useful as is.
Definitely not not Scottish ;-)

Thanks Duncan.

Peace,
~Simon

from Queue import Queue
from threading import Thread, currentThread

# Imports for the dummy testing func.
from time import sleep
from random import random

NUMTHREADS = 3
def dummy_func(secs):
'''
Sleep for secs seconds then return secs.
(Dummy function to simulate requests.)
'''
sleep(secs)
return secs

# Some fake requests to pass to dummy_func().
dummy_requests = [
5 * random() for notused in xrange(100)
]

# Dummy handle_response() for demo purposes.
def handle_response(resp): print resp
def run(request, response, func=dummy_func):
'''
Get items from the request Queue, process them
with func(), put the results along with the
Thread's name into the response Queue.

Stop running once an item is None.
'''
name = currentThread().getName()
while 1:
item = request.get()
if item is None:
break
response.put((name, func(item)))
# Create two Queues for the requests and responses
requestQueue = Queue()
responseQueue = Queue()
# Pool of NUMTHREADS Threads that run run().
thread_pool = [
Thread(
target=run,
args=(requestQueue, responseQueue),
name="Thread %i" % i
)
for i in range(NUMTHREADS)
]
# Start the threads.
for t in thread_pool: t.start()
# Queue up the requests.
for item in dummy_requests: requestQueue.put(item)
# Shut down the threads after all requests end.
# (Put one None "sentinel" for each thread.)
for t in thread_pool: requestQueue.put(None)
# Get and handle each response.
for notused in xrange(len(dummy_requests)):
response = responseQueue.get()
handle_response(response)
# Don't end the program prematurely.
#
# (Note that because Queue.get() is blocking by
# default this isn't strictly necessary. But if
# you were, say, handling responses in another
# thread, you'd want something like this in your
# main thread.)
for t in thread_pool: t.join()

###
Example output from a run of the above script:

('Thread 0', 0.10915462751068916)
('Thread 0', 0.29428189134629135)
('Thread 1', 1.6234285192453246)
('Thread 0', 3.195799156145096)
('Thread 1', 2.7641123440885367)
('Thread 2', 4.7810243032096862)
('Thread 2', 1.1752965020601662)
('Thread 1', 2.0727863018148924)
('Thread 0', 4.8127195859913252)
('Thread 1', 2.4780495377626242)
..
..
..
etc...

Jul 27 '06 #8

P: n/a
Duncan Booth on Thursday 27 Jul 2006 17:17 wrote:
What you want is to use a pool of threads so that you can configure how
many requests are issued at a time (you don't want to try to issue 100
requests all in parallel). You can communicate with the threads through a
Queue.
Thanks to both Duncan and Simon for your motivating replies. I had forgotten the
truth that the lists are a medium to learn and educate.
I hope you won't be very frustrated by my dummy questions.
So if the code for a thread looks like:

def run(request, response):
while 1:
item = request.get()
if item is None:
break
response.put(download_from_web(item))
This is where I'm most confused.

I have a list, lRawData which has around, say, 100 items which are read from a
file.
Currently (without threads), I iterate upon each item in the list, split the
item and then manipulate and pass it to download_from_web() to do the real
work.

I'm mainly confused in your run().
The line response.put(download_from_web(item)) is there. But if I use it, how am
I going to iterate on my list lRawData.
I'm not able to see where and how, run() or the line item = request.get() goes
into my list lRawData to get the items.

Can you please explain the functioning of the above code block.
# your main loop can be something like:

requestQueue = Queue()
responseQueue = Queue()
thread_pool = [
Thread(target=run, args=(requestQueue, responseQueue)
for i in range(numthreads)]
for t in thread_pool: t.start()
This part seems understandable.
>
for item in list_items:
requestQueue.put(item)

for i in range(len(list_items)):
response = responseQueue.get()
handle_response(response)
I guess these would be clear once I'm clear on the first problem.
# and then to shut down the threads when you've finished:
for t in thread_pool:
requestQueue.put(None)
for t in thread_pool:
t.join()
Thanks a lot for your replies.

Ritesh
--
Ritesh Raj Sarraf
RESEARCHUT - http://www.researchut.com
"Necessity is the mother of invention."
"Stealing logic from one person is plagiarism, stealing from many is research."
"The great are those who achieve the impossible, the petty are those who
cannot - rrs"

Jul 28 '06 #9

P: n/a
And people, Is there any documentation on Python Threads or Threads in general.
It'd be of great help to really understand.

Ritesh

Ritesh Raj Sarraf on Thursday 27 Jul 2006 16:37 wrote:
Is this the correct way of threading applications ?
This is the first time I'd be doing threading. So was looking for
comments and suggestions.
--
Ritesh Raj Sarraf
RESEARCHUT - http://www.researchut.com
"Necessity is the mother of invention."
"Stealing logic from one person is plagiarism, stealing from many is research."
"The great are those who achieve the impossible, the petty are those who
cannot - rrs"

Jul 28 '06 #10

P: n/a
Simon Forman on Thursday 27 Jul 2006 22:47 wrote:
def run(request, response, func=dummy_func):
'''
Get items from the request Queue, process them
with func(), put the results along with the
Thread's name into the response Queue.

Stop running once an item is None.
'''
name = currentThread().getName()
while 1:
item = request.get()
if item is None:
break
response.put((name, func(item)))
Meanwhile, instead of sitting idle and waiting for a reply, I thought of trying
to understand the code (the example by Simon).
Good part is that I was able to use it. :-)

Here's the changed code:

from Queue import Queue
from threading import Thread, currentThread

# Imports for the dummy testing func.
from time import sleep
from random import random

NUMTHREADS = 3

def run(request, response, func=download_from_web):
'''
* * Get items from the request Queue, process them
* * with func(), put the results along with the
* * Thread's name into the response Queue.

* * Stop running once an item is None.
* * '''
name = currentThread().getName()
while 1:
item = request.get()
(sUrl, sFile, download_size, checksum) = stripper(item)
if item is None:
break
response.put((name, func(sUrl, sFile, sSourceDir, None)))
# Create two Queues for the requests and responses
requestQueue = Queue()
responseQueue = Queue()
# Pool of NUMTHREADS Threads that run run().
thread_pool = [
Thread(
target=run,
args=(requestQueue, responseQueue)
)
for i in range(NUMTHREADS)
]
# Start the threads.
for t in thread_pool: t.start()

# Queue up the requests.
for item in lRawData: requestQueue.put(item)
# Shut down the threads after all requests end.
# (Put one None "sentinel" for each thread.)
for t in thread_pool: requestQueue.put(None)

# Don't end the program prematurely.
#
# (Note that because Queue.get() is blocking by
# default this isn't strictly necessary. *But if
# you were, say, handling responses in another
# thread, you'd want something like this in your
# main thread.)
for t in thread_pool: t.join()

I'd like to put my understanding over here and would be happy if people can
correct me at places.

So here it goes:
Firstly the code initializes the number of threads. Then it moves on to
initializing requestQueue() and responseQueue().
Then it moves on to thread_pool, where it realizes that it has to execute the
function run().
>From NUMTHREADS in the for loop, it knows how many threads it is supposed to
execute parallelly.

So once the thread_pool is populated, it starts the threads.
Actually, it doesn't start the threads. Instead, it puts the threads into the
queue.

Then the real iteration, about which I was talking in my earlier post, is done.
The iteration happens in one go. And requestQueue.put(item) puts all the items
from lRawData into the queue of the run().
But there, the run() already known its limitation on the number of threads.

No, I think the above statement is wrong. The actual pool about the number of
threads is stored by thread_pool. Once its pool (at a time 3 as per this
example) is empty, it again requests for more threads using the requestQueue()

And in function run(), when the item of lRawData is None, the thread stops.
The the cleanup and checks of any remaining threads is done.

Is this all correct ?

I also do have a couple of questions more which would be related to locks. But
I'd post them once I get done with this part.

Thanks,
Ritesh
--
Ritesh Raj Sarraf
RESEARCHUT - http://www.researchut.com
"Necessity is the mother of invention."
"Stealing logic from one person is plagiarism, stealing from many is research."
"The great are those who achieve the impossible, the petty are those who
cannot - rrs"

Jul 28 '06 #11

P: n/a
Ritesh Raj Sarraf wrote:
I'd like to put my understanding over here and would be happy if people can
correct me at places.
ok :-)
So here it goes:
Firstly the code initializes the number of threads. Then it moves on to
initializing requestQueue() and responseQueue().
Then it moves on to thread_pool, where it realizes that it has to execute the
function run().
From NUMTHREADS in the for loop, it knows how many threads it is supposed to
execute parallelly.
right...
So once the thread_pool is populated, it starts the threads.
Actually, it doesn't start the threads. Instead, it puts the threads into the
queue.
Neither.. it puts the threads into a list. It "puts" the queues into
the thread - by passing them as arguments to the Thread constructor.
Then the real iteration, about which I was talking in my earlier post, is done.
The iteration happens in one go. And requestQueue.put(item) puts all the items
from lRawData into the queue of the run().
It doesn't necessarily have put all the items into the queue at once.
The previous line starts all the threads, which immediately start
running
while 1:
item = request.get()
....

the default Queue size is infinite, but the program would still work
fine if the queue was fixed to say, 6 elements. Now that I think of
it, it may even perform better... If you have an iterator that will
generate a very large number of items, and the function being called by
each thread is slow, the queue may end up growing to hold millions of
items and cause the system to run out of memory.
But there, the run() already known its limitation on the number of threads.
run() doesn't know anything about threads. All it knows is that it can
call request.get() to get an item to work on, and response.put() when
finished.
No, I think the above statement is wrong. The actual pool about the number of
threads is stored by thread_pool. Once its pool (at a time 3 as per this
example) is empty, it again requests for more threads using the requestQueue()
The pool is never empty. The program works like a bank with 3 tellers.
Each teller knows nothing about any of the other tellers, or how many
people are waiting in the line. All they know is that when they say
"Next!" (request.get()) another person steps in front of them. The
tellers don't move, the line moves.
And in function run(), when the item of lRawData is None, the thread stops.
The the cleanup and checks of any remaining threads is done.
Yes, since each thread doesn't know anything about the rest of the
program, when you send it an empty item it knows to quit. It would be
analogous to the bank teller saying "Next!" and instead of a customer,
the bank mananger steps forward to tell them that they can go home for
the day.
Is this all correct ?
Mostly :) When you understand it fully, you should look at the example
I showed you before. It is essentially the same thing, just wrapped in
a class to be reusable.

--
- Justin

Jul 28 '06 #12

P: n/a
Hi,

I have this following situation:

#INFO: Thread Support
# Will require more design thoughts
from Queue import Queue
from threading import Thread, currentThread

NUMTHREADS = variables.options.num_of_threads

def run(request, response, func=download_from_web):
'''Get items from the request Queue, process them
* * with func(), put the results along with the
* * Thread's name into the response Queue.

* * Stop running once an item is None.'''

name = currentThread().getName()
while 1:
item = request.get()
(sUrl, sFile, download_size, checksum) = stripper(item)
if item is None:
break
response.put((name, func(sUrl, sFile, sSourceDir, None)))

My download_from_web() returns True or False depending upon whether the download
was successful or failed. How can I check that in the above code ?

One other question I had,
If my user passes the --zip option, download_from_web() internally (when the
download is successful) zips the downloaded data to a zip file. Since in case
of threading there'll be multiple threads, and say if one of the thread
completes 2 seconds before others and is doing the zipping work:
What will the other thread, at that moment do, if it completes while the
previous thread is doing the zipping work ?

Thanks,
Ritesh

Justin Azoff on Thursday 27 Jul 2006 22:33 wrote:
Ritesh Raj Sarraf wrote:
[snip]
>for item in list_items:
download_from_web(item)

This way, one items is downloaded at a time.

I'm planning to implement threads in my application so that multiple
items can be downloaded concurrently. I want the thread option to be
user-defined.
[snip]

See my post about the iterthreader module I wrote...
http://groups.google.com/group/comp....29fae28cf44c1/
>
for url, result in Threader(download_from_web, list_items):
print url, result
#...
--
Ritesh Raj Sarraf
RESEARCHUT - http://www.researchut.com
"Necessity is the mother of invention."
"Stealing logic from one person is plagiarism, stealing from many is research."
"The great are those who achieve the impossible, the petty are those who
cannot - rrs"

Aug 2 '06 #13

P: n/a
Ritesh Raj Sarraf wrote:
Hi,

I have this following situation:

#INFO: Thread Support
# Will require more design thoughts
from Queue import Queue
from threading import Thread, currentThread

NUMTHREADS = variables.options.num_of_threads

def run(request, response, func=download_from_web):
'''Get items from the request Queue, process them
with func(), put the results along with the
Thread's name into the response Queue.

Stop running once an item is None.'''

name = currentThread().getName()
while 1:
item = request.get()
(sUrl, sFile, download_size, checksum) = stripper(item)
if item is None:
break
response.put((name, func(sUrl, sFile, sSourceDir, None)))
One thing about this code: you should check whether item is None
*before* passing it to stripper(). Even if stripper() can handle None
as input there's no reason to make it do so.
My download_from_web() returns True or False depending upon whether the download
was successful or failed. How can I check that in the above code ?
Well, you'd probably want to check that *outside* the above run()
function. The True/False return values will queue up in the
responseQueue, where you can access them with responseQueue.get().
Since these can be in a different order than your requests you're going
to want to "tag" them with the sUrl and sFile.

response.put((name, sUrl, sFile, func(sUrl, sFile, sSourceDir, None)))

That way your response handing code can tell which downloads succeeded
and which failed.

Of course, you could have the run() function check the return value
itself and retry the download ane or a few times.
>
One other question I had,
If my user passes the --zip option, download_from_web() internally (when the
download is successful) zips the downloaded data to a zip file. Since in case
of threading there'll be multiple threads, and say if one of the thread
completes 2 seconds before others and is doing the zipping work:
What will the other thread, at that moment do, if it completes while the
previous thread is doing the zipping work ?
The other threads will just take the next request from the Queue and
process it. They won't "care" what the one thread is doing,
downloading, zipping, whatever.
Peace,
~Simon

Aug 2 '06 #14

P: n/a

Simon Forman wrote:
One other question I had,
If my user passes the --zip option, download_from_web() internally (when the
download is successful) zips the downloaded data to a zip file. Since in case
of threading there'll be multiple threads, and say if one of the thread
completes 2 seconds before others and is doing the zipping work:
What will the other thread, at that moment do, if it completes while the
previous thread is doing the zipping work ?

The other threads will just take the next request from the Queue and
process it. They won't "care" what the one thread is doing,
downloading, zipping, whatever.

The thread will be marked as complete only when the function that it
executed exits. Right ?

download_from_web() internally calls a funtion to zip the file. So it
doesn't return before zipping. Till then this thread is not complete
and therefore is busy working (i.e. zipping).

during the same time if another thread (which is also calling
download_from_web) completes the download, the download function will
again call the zip code. At that particular situtation, will it wait
for the previous thread to complete the zipping and release the file so
that it can zip more data to it or will it just panic and quit ?

I think this shouldn't be much concern. The d_f_w() calls zip(). And
since zip opens the file.zip file in append mode, no matter how many
threads access it, it should be okay.

Ritesh

Aug 3 '06 #15

P: n/a

Simon Forman wrote:
>
The other threads will just take the next request from the Queue and
process it. They won't "care" what the one thread is doing,
downloading, zipping, whatever.
As I mentioned in my previous post, the other threads will also have to
go through the same "zip the file if the download was successful"
method.

I implemented it but am seeing some issues.
If I use a single thread, all files are zipped to the archive.
Obviously this has to work.

If threads are 2 or 3 or 4 in numbers, some of the files don't show up
in the archive.

What would a thread do (which wants to add files to an archive) if it
finds that another thread has opened the archive in append mode ?
Will it wait, overwrite or just die ?

Looking at the way my program is behaving, it looks like at that
condition the threads are skipping/dying or something like that.

Ritesh

Aug 3 '06 #16

P: n/a
Ritesh Raj Sarraf wrote:
Simon Forman wrote:
One other question I had,
If my user passes the --zip option, download_from_web() internally (when the
download is successful) zips the downloaded data to a zip file. Since in case
of threading there'll be multiple threads, and say if one of the thread
completes 2 seconds before others and is doing the zipping work:
What will the other thread, at that moment do, if it completes while the
previous thread is doing the zipping work ?
The other threads will just take the next request from the Queue and
process it. They won't "care" what the one thread is doing,
downloading, zipping, whatever.

The thread will be marked as complete only when the function that it
executed exits. Right ?

download_from_web() internally calls a funtion to zip the file. So it
doesn't return before zipping. Till then this thread is not complete
and therefore is busy working (i.e. zipping).

during the same time if another thread (which is also calling
download_from_web) completes the download, the download function will
again call the zip code. At that particular situtation, will it wait
for the previous thread to complete the zipping and release the file so
that it can zip more data to it or will it just panic and quit ?
If you have multiple threads trying to access the same ZIP file at the
same time, whether or not they use the same ZipFile object, you'll have
trouble. You'd have to change download_from_web to protect against
simultaneous use. A simple lock should suffice. Create the lock in
the main thread, like so:

ziplock = threading.Lock()

Then change the zipping part of download_from_web to acquire and
release this lock; do zipfile operations only between them.

ziplock.acquire()
try:
do_all_zipfile_stuff_here()
finally:
ziplock.release()

If you can't change download_from_web, you might have no choice but to
download sequentially.

OTOH, if each thread uses a different ZIP file (and a different ZipFile
object), you wouldn't have to use a lock. It doesn't sound like you're
doing that, though.

It shouldn't be a problem if one thread is zipping at the same time
another is downloading, unless there's some common data between them
for some reason.
Carl Banks

Aug 3 '06 #17

P: n/a
On 2006-08-03 08:49:45, Ritesh Raj Sarraf wrote:
I implemented it but am seeing some issues.
If I use a single thread, all files are zipped to the archive.
Obviously this has to work.

If threads are 2 or 3 or 4 in numbers, some of the files don't show up
in the archive.

What would a thread do (which wants to add files to an archive) if it
finds that another thread has opened the archive in append mode ?
Will it wait, overwrite or just die ?
Depends on the library or program you use for zipping, but usually they
just die and return an error ("could not open file" or so).

Rather than downloading and zipping in the same thread, you could run
multiple threads like you're doing that only download files, and one
zip-it-all-up thread. After downloading a file, the download threads place
a message in a queue that indicates the file they have downloaded, and the
zip-it-all-up thread takes messages out of that queue, one at a time, and
zips the files.

Gerhard

Aug 3 '06 #18

P: n/a

Gerhard Fiedler wrote:
Rather than downloading and zipping in the same thread, you could run
multiple threads like you're doing that only download files, and one
zip-it-all-up thread. After downloading a file, the download threads place
a message in a queue that indicates the file they have downloaded, and the
zip-it-all-up thread takes messages out of that queue, one at a time, and
zips the files.
I was using this approach earlier. The problem with this approach is
too much temporary disk usage.

Say I'm downloading 2 GB of data which is a combination of, say 600
files. Now following this approach, I'll have to make sure that I have
4 GB of disk space available on my hard drive.

Where as downloading in pieces, adding them to the archive, and then
unlinking the downloaded file helps proper utilization of disk
resource.

Thanks,
Ritesh

Aug 4 '06 #19

P: n/a

Carl Banks wrote:
If you have multiple threads trying to access the same ZIP file at the
same time, whether or not they use the same ZipFile object, you'll have
trouble. You'd have to change download_from_web to protect against
simultaneous use. A simple lock should suffice. Create the lock in
the main thread, like so:

ziplock = threading.Lock()
Thanks. This looks to be the correct way to go. I do have access to all
the source code as it is under GPL.

Then change the zipping part of download_from_web to acquire and
release this lock; do zipfile operations only between them.

ziplock.acquire()
try:
do_all_zipfile_stuff_here()
finally:
ziplock.release()
I hope while one thread has acquired the lock, the other threads (which
have done the downloading work and are ready to zip) would wait.
If you can't change download_from_web, you might have no choice but to
download sequentially.

OTOH, if each thread uses a different ZIP file (and a different ZipFile
object), you wouldn't have to use a lock. It doesn't sound like you're
doing that, though.

It shouldn't be a problem if one thread is zipping at the same time
another is downloading, unless there's some common data between them
for some reason.

Thanks,
Ritesh

Aug 4 '06 #20

P: n/a
Ritesh Raj Sarraf wrote:
Carl Banks wrote:
Then change the zipping part of download_from_web to acquire and
release this lock; do zipfile operations only between them.

ziplock.acquire()
try:
do_all_zipfile_stuff_here()
finally:
ziplock.release()

I hope while one thread has acquired the lock, the other threads (which
have done the downloading work and are ready to zip) would wait.
Exactly. Only one thread can hold a lock at a time. If a thread tries
to acquire a lock that some other thread has, it'll wait until the
other thread releases it. You need locks to do this stuff because most
things (such as zipfile objects) don't wait for other threads to
finish.
Carl Banks

Aug 4 '06 #21

P: n/a

Carl Banks wrote:
>
Exactly. Only one thread can hold a lock at a time. If a thread tries
to acquire a lock that some other thread has, it'll wait until the
other thread releases it. You need locks to do this stuff because most
things (such as zipfile objects) don't wait for other threads to
finish.
I would heartly like to thank you for the suggestion you made.
My program now works exactly as I wanted. Thanks. :-)

Ritesh

Aug 4 '06 #22

P: n/a
Carl Banks wrote:
Ritesh Raj Sarraf wrote:
>Carl Banks wrote:
>>Then change the zipping part of download_from_web to acquire and
release this lock; do zipfile operations only between them.

ziplock.acquire()
try:
do_all_zipfile_stuff_here()
finally:
ziplock.release()

I hope while one thread has acquired the lock, the other threads (which
have done the downloading work and are ready to zip) would wait.

Exactly. Only one thread can hold a lock at a time.
In the code above, a form called a "critical section", we might
think of a thread as holding the lock when it is between the
acquire() and release(). But that's not really how Python's
locks work. A lock, even in the locked state, is not held by
any particular thread.
If a thread tries
to acquire a lock that some other thread has, it'll wait until the
other thread releases it.
More accurate: If a thread tries to acquire a lock that is in
the locked state, it will wait until some thread releases it.
(Unless it set the blocking flag false.) If more that one thread
is waiting to acquire the lock, it may be blocked longer.

I think the doc for threading.Lock is good:

http://docs.python.org/lib/lock-objects.html
--
--Bryan
Aug 5 '06 #23

P: n/a
Bryan Olson on Saturday 05 Aug 2006 13:31 wrote:
>Exactly. *Only one thread can hold a lock at a time.

In the code above, a form called a "critical section", we might
think of a thread as holding the lock when it is between the
acquire() and release(). But that's not really how Python's
locks work. A lock, even in the locked state, is not held by
any particular thread.
>If a thread tries
to acquire a lock that some other thread has, it'll wait until the
other thread releases it.

More accurate: If a thread tries to acquire a lock that is in
the locked state, it will wait until some thread releases it.
(Unless it set the blocking flag false.) If more that one thread
is waiting to acquire the lock, it may be blocked longer.

I think the doc for threading.Lock is good:

http://docs.python.org/lib/lock-objects.html
You're correct.
I noticed that even though while one thread acquires the lock, the other threads
don't respect the lock. In fact they just go ahead and execute the statements
within the lock acquire statement. With this behavior, I'm ending up having a
partially corrupted zip archive file.

def run(request, response, func=copy_first_match):
'''Get items from the request Queue, process them
with func(), put the results along with the
Thread's name into the response Queue.

Stop running once an item is None.'''

name = threading.currentThread().getName()
ziplock = threading.Lock()
while 1:
item = request.get()
if item is None:
break
(sUrl, sFile, download_size, checksum) = stripper(item)
response.put((name, sUrl, sFile, func(cache, sFile, sSourceDir,
checksum)))

# This will take care of making sure that if downloaded, they
are zipped
(thread_name, Url, File, exit_status) = responseQueue.get()
if exit_status == False:
log.verbose("%s not available in local cache %s\n" % (File,
cache))
if download_from_web(sUrl, sFile, sSourceDir, checksum) !=
True:
log.verbose("%s not downloaded from %s and NA in local
cache %s\n\n" % (sFile, sUrl, sRepository))
else:
# We need this because we can't do join or exists
operation on None
if cache is None or os.path.exists(os.path.join(cache,
sFile)):
#INFO: The file is already there.
pass
else:
shutil.copy(sFile, cache)
if zip_bool:
ziplock.acquire()
try:
compress_the_file(zip_type_file, sFile,
sSourceDir)
os.remove(sFile) # Remove it because we
don't need the file once it is zipped.
finally:
ziplock.release()
elif exit_status == True:
if zip_bool:
ziplock.acquire()
try:
compress_the_file(zip_type_file, sFile, sSourceDir)
os.unlink(sFile)
finally:
ziplock.release()

--
Ritesh Raj Sarraf
RESEARCHUT - http://www.researchut.com
"Necessity is the mother of invention."
"Stealing logic from one person is plagiarism, stealing from many is research."
"The great are those who achieve the impossible, the petty are those who
cannot - rrs"

Aug 5 '06 #24

P: n/a
Ritesh Raj Sarraf wrote:
[...]
I noticed that even though while one thread acquires the lock, the other threads
don't respect the lock. In fact they just go ahead and execute the statements
within the lock acquire statement. With this behavior, I'm ending up having a
partially corrupted zip archive file.
No, Carl's code was fine. I just found his explanation
misleading.

def run(request, response, func=copy_first_match):
'''Get items from the request Queue, process them
with func(), put the results along with the
Thread's name into the response Queue.

Stop running once an item is None.'''

name = threading.currentThread().getName()
ziplock = threading.Lock()
You don't want "ziplock = threading.Lock()" in the body of
the function. It creates a new and different lock on every
execution. Your threads are all acquiring different locks.
To coordinate your threads, they need to be using the same
lock.

Try moving "ziplock = threading.Lock()" out of the function, so
your code might read, in part:
ziplock = threading.Lock()

def run(request, response, func=copy_first_match):
# And so on...
--
--Bryan
Aug 5 '06 #25

P: n/a
Bryan Olson on Saturday 05 Aug 2006 23:56 wrote:
You don't want "ziplock = threading.Lock()" in the body of
the function. It creates a new and different lock on every
execution. Your threads are all acquiring different locks.
To coordinate your threads, they need to be using the same
lock.

Try moving "ziplock = threading.Lock()" out of the function, so
your code might read, in part:
ziplock = threading.Lock()

def run(request, response, func=copy_first_match):
# And so on...
Thanks. That did it. :-)

Ritesh
--
Ritesh Raj Sarraf
RESEARCHUT - http://www.researchut.com
"Necessity is the mother of invention."
"Stealing logic from one person is plagiarism, stealing from many is research."
"The great are those who achieve the impossible, the petty are those who
cannot - rrs"

Aug 5 '06 #26

P: n/a
Ritesh Raj Sarraf wrote:
Bryan Olson on Saturday 05 Aug 2006 23:56 wrote:
You don't want "ziplock = threading.Lock()" in the body of
the function. It creates a new and different lock on every
execution. Your threads are all acquiring different locks.
To coordinate your threads, they need to be using the same
lock.

Try moving "ziplock = threading.Lock()" out of the function, so
your code might read, in part:
ziplock = threading.Lock()

def run(request, response, func=copy_first_match):
# And so on...

Thanks. That did it. :-)

Ritesh
Another thing you might want to consider would be to split your
download and zipping code into separate functions then create one more
thread to do all the zipping. That way your downloading threads would
never be waiting around for each other to zip.
Just a thought. :)
~Simon

Aug 5 '06 #27

P: n/a
On 2006-08-04 04:22:59, Ritesh Raj Sarraf wrote:
Gerhard Fiedler wrote:
>Rather than downloading and zipping in the same thread, you could run
multiple threads like you're doing that only download files, and one
zip-it-all-up thread. After downloading a file, the download threads
place a message in a queue that indicates the file they have
downloaded, and the zip-it-all-up thread takes messages out of that
queue, one at a time, and zips the files.

I was using this approach earlier. The problem with this approach is
too much temporary disk usage.

Say I'm downloading 2 GB of data which is a combination of, say 600
files. Now following this approach, I'll have to make sure that I have
4 GB of disk space available on my hard drive.
Not necessarily. You have a minimum speed of the zipping process, and a
maximum speed of the download. Between the two you can figure out what the
maximum required temp storage space is. It's in any case less than the full
amount, and if the minimum zipping speed is faster than the maximum
download speed, it's not more than a few files.

But if you current solution works, then that's good enough :) It probably
wouldn't be much faster anyway; only would avoid a few waiting periods.

Gerhard

Aug 6 '06 #28

This discussion thread is closed

Replies have been disabled for this discussion.