Hi,
i currently use this code to divide work between worker threads:
var linePtr: seq[pointer]
for line in memSlices(memfiles.open($args["--inputFile"], mode = fmRead)):
linePtr.add(line.data)
let batchSize = (linePtr.len / parseInt($args["--cores"])).ceil().toInt()
let f = system.open($args["--inputFile"])
{.push experimental: "parallel".}
parallel:
for i in 0..(parseInt($args["--cores"])-1):
spawn worker(f, linePtr.at((batchSize * i)), batchSize)
{.pop.}
and this worker function:
proc worker(fd: File, offset, size: int) {.thread.} =
let fs = newFileStream(fd)
fs.setPosition(offset)
var i = 0
while i < size:
echo i, " ", size, " ", fs.atEnd(), " ", fs.readLine()
inc(i)
now the thing is: the program throws after a random amount of lines an IOError; when i wrap the while-loop as follows:
while i < size:
try:
echo i, " ", size, " ", fs.atEnd(), " ", fs.readLine()
except IOError:
echo i, " ", size, " ", fs.atEnd()
inc(i)
i see that fs.atEnd() returns true even if i shuld have lines left. Why is this happening and how do i crcumvent it?
Also a little side question: i currently divide into chunks of size ceil(total_lines/core_count) this means the last chunk is bigger or smaller than the previous ones. Is it possible to use fs.atEnd() as in while i<size and not fs.atEnd()? Because i didn't mange to get this to work either :(.
Thx in advance, ff
This is my chunking algorithm: https://forum.nim-lang.org/t/5579#34696
I can't say about the atEnd, would need to investigate into it.
For reference for other to the previous thread: https://forum.nim-lang.org/t/5504#34400
It should be offset ..< size.
Basically the parallelChunks proc create N chunks and will give you the chunkStart and chunkLength. Then you can spawn N serial proc that each take a chunkStart + chunkLength as argument.
So what you did is good, except the sync should be outside the loop:
parallelChunks(1, linePtr.len(), chunkOffset, chunkSize):
spawn worker(f, linePtr.at(chunkOffset), chunkSize)
sync()
and you just need to compile with --threads:on
For reference I am using similar templates in Laser (but with OpenMP backend not Nim threadpools):
Here is a minimum working example of setting chunks in parallel with a random value.
import threadpool, random, cpuinfo
template parallelChunks(start, stop: int, chunkOffset, chunkSize: untyped{ident}, body: untyped): untyped =
let
numIters = (stop - start)
numChunks = countProcessors()
baseChunkSize = numIters div numChunks
remainder = numIters mod numChunks
var chunkOffset {.inject.}, chunkSize {.inject.}: Natural
for threadID in 0 ..< numChunks:
if threadID < remainder:
chunkOffset = start + (baseChunkSize + 1) * threadID
chunkSize = baseChunkSize + 1
else:
chunkOffset = start + baseChunkSize * threadID + remainder
chunkSize = baseChunkSize
block: body
proc setValue(chunk: ptr int, size: int, value: int) =
# Allow array indexing on pointers
let chunk = cast[ptr UncheckedArray[int]](chunk)
for i in 0 ..< size:
chunk[i] = value
proc main() =
# We will randomly set the values of a sized 100 sequence
var s = newSeq[int](100)
echo "Before: ", s
# Seed a rng
randomize(1234)
parallelChunks(0, s.len - 1, chunkOffset, chunkSize):
# Take a pointer to the offset of the seq
let chunkStart = s[chunkOffset].addr
# Get a random value for each thread/chunk of the seq
let val = rand(100)
# Parallel set the values
spawn setValue(chunkStart, chunkSize, val)
# Wait for all threads to finish
sync()
# Check the result
echo "After: ", s
main()
Output on my machine (36 cores):
Before: @[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
After: @[0, 0, 0, 37, 37, 37, 98, 98, 98, 18, 18, 18, 83, 83, 83, 15, 15, 15, 91, 91, 91, 48, 48, 48, 86, 86, 86, 10, 10, 10, 58, 58, 58, 78, 78, 78, 44, 44, 44, 3, 3, 3, 22, 22, 22, 17, 17, 17, 83, 83, 83, 42, 42, 42, 97, 97, 97, 24, 24, 24, 20, 20, 20, 55, 55, 55, 6, 6, 6, 98, 98, 98, 71, 71, 71, 66, 66, 66, 72, 72, 72, 12, 12, 88, 88, 83, 83, 55, 55, 74, 74, 40, 40, 21, 21, 46, 46, 95, 95, 0]
You can see that all threads have either 3 or 2 items, a naive chunking instead would give:
100/36 = 2.77, using 2 would give 2 tasks for the first 35 threads and 100 - 2*35 = 30 tasks for the last one.
Thank you for your time and the detailed explanation 🙂, now i understand and it works across many physical cores.
Because i have to write data into a single file the lines are now mixed up, i think i need channels (?) to solve this, i will read how that works try it out and ask again if i have problems :)
The issue is that to write to a file, you basically need to serialize the writes. Multithreading will only help you for compute tasks that are CPU-bound. Writing to disk is very very very very slow, even a SSD, to give you an idea:
From What every programmer should know about latency
Latency Comparison Numbers (~2012)
----------------------------------
L1 cache reference 0.5 ns
Branch mispredict 5 ns
L2 cache reference 7 ns 14x L1 cache
Mutex lock/unlock 25 ns
Main memory reference 100 ns 20x L2 cache, 200x L1 cache
Compress 1K bytes with Zippy 3,000 ns 3 us
Send 1K bytes over 1 Gbps network 10,000 ns 10 us
Read 4K randomly from SSD* 150,000 ns 150 us ~1GB/sec SSD
Read 1 MB sequentially from memory 250,000 ns 250 us
Round trip within same datacenter 500,000 ns 500 us
Read 1 MB sequentially from SSD* 1,000,000 ns 1,000 us 1 ms ~1GB/sec SSD, 4X memory
Disk seek 10,000,000 ns 10,000 us 10 ms 20x datacenter roundtrip
Read 1 MB sequentially from disk 20,000,000 ns 20,000 us 20 ms 80x memory, 20X SSD
Send packet CA->Netherlands->CA 150,000,000 ns 150,000 us 150 ms
Notes
-----
1 ns = 10^-9 seconds
1 us = 10^-6 seconds = 1,000 ns
1 ms = 10^-3 seconds = 1,000 us = 1,000,000 ns
Credit
------
By Jeff Dean: http://research.google.com/people/jeff/
Originally by Peter Norvig: http://norvig.com/21-days.html#answers
Contributions
-------------
'Humanized' comparison: https://gist.github.com/hellerbarde/2843375
Visual comparison chart: http://i.imgur.com/k0t1e.png
On a 3Ghz CPU (i.e. ~3 billions basic instructions per second), a basic operation like add, sub, or, and, mul, shift, xor, if/then/else takes 1s / 3e9 instr = 0.33 ns/instr. Assuming a SSD at 150000 us / 0.33 (ns/instr) = 454545 basic operations in the time required to write 4kB to a SSD.
Unless you have very heavy processing to do before writing to disk you don't need to parallelize.
Hm yes that makes sense.
The input fie contains a URL per line which has to be checked and the result should be written into a file. Because this depends on Network latency i thought it would speed up the program if i spawn many connections ~300-400 concurrently. But maybe i'm mistaken.