Hello, World! I'm both new to this forum and new to using Nim!
I've got this issue where I'm trying to process some particle simulation data from a 32GB file using Nim. I've got the logic for the processing down so far (I'm porting my project over from Clojure to Nim. it's just some regular expressions and if statements), but I've got problems either with the multi-threading part of the program or with the part that sends each line of the file to a Channel[string].
My program gets killed by the Linux OOM Killer after using up 46868652kB (46.8GB) of virtual memory according to dmesg, so this is definitely a memory leak:
[ 4254.914720] oom-kill:constraint=CONSTRAINT_NONE,nodemask=(null),cpuset=/,mems_allowed=0,global_oom,task_memcg=/user.slice/user-1000.slice/[email protected],task=Malachite,pid=10951,uid=1000
[ 4254.914748] Out of memory: Killed process 10951 (Malachite) total-vm:46868652kB, anon-rss:3443884kB, file-rss:0kB, shmem-rss:0kB, UID:1000 pgtables:33872kB oom_score_adj:0
[ 4256.088282] oom_reaper: reaped process 10951 (Malachite), now anon-rss:0kB, file-rss:0kB, shmem-rss:0kB
Here's my code if anyone wants to check it: Malachite.nim (It depends on psutil. Please ignore the typos in the documentation, I didn't notice them before uploading the file...). I'm compiling it with nim c --threads:on -o:bin/Malachite src/Malachite.nim
The program reads a file called particles.txt which has 10 columns of numbers, but so far it only reads the first two numbers of each line so you can ignore the rest. If you want to test my program, you'll need a particles.txt file that looks something like this in the executable's path:
1 10
2 9
3 8
4 7
5 6
6 5
7 4
8 3
9 2
10 1
That's enough for the program to run, though you can make the file longer if you want to test it further (No zeroes or having the same number in the same line though!)
The thing that leads me to believe is that something is wrong with my installation/setup is that I've been getting help from someone in /r/nim , and they say the the holderEcho procedure, that is broken on my system (It gets stuck on infinite loop when the end of the file is reached. Described the issue further on its documentation), works perfectly fine on theirs. So perhaps there is something wrong with my installation/setup that's messing up the multi-threading too (or my code could just be broken, that's also a possibility!)
Any and all help is appreciated!
Adapting your program along the above lines, I got something that used about 14x less CPU and 40% the memory but still leaked. (I just concatenated your sample data together 2**20 times.) Sorry - not sure what the problem is there. Leaks with both default gc, --gc:orc, --gc:boehm, and --gc:markAndSweep (the last leaks the least). Maybe there is some special clean-up that Channel[T] or its helpers require and someone around here who uses them more can say what? I've never used it before now.
Really though, reconsider your entire approach to parallelism. Since you can map the whole file, instead of "fine-grained" line-by-line parallelism with all its related costs, you might be able to partition the entire input into nCpu independent chunks and parallelize over those chunks. Whether this can work depends upon how independent all the work is. Your current setup makes it look super independent, like lines can be processed in any old order.
If this can work, you need only divide the size by nCpu and then scan forward to the first newline after each i*stride byte offset and record the "chunks" with those newline boundaries. Then you spawn a thread per giant chunk. You don't need Channel[T] at all for this coarse-grained parallelism.
You might have to pass a threadId (c from your spawn loop) and open a separate output file for each c. Then you may need to post-process the nCpu output files to combine the results (if cat is not enough).
I would expect the memory leak to go away under such a reconception. This should also run much faster since there is no real coordination required between threads. With this structure, you can probably eliminate all dynamic allocation entirely. Processing a nearly equal amount of data will probably make them run close enough to the same amount of time. You just wait for them all to finish. Yes, some will finish a little early, but that "error" is likely to be dramatically less than the cost of coordinating threads over 32 GiB of lines. You could have just threadId == 0 print out a progress report (which again can be a byte percentage not a line percentage). If you really need to sequence things, you can emit a tag in each output line and sort by that, but this obviously could be costly. It may still cost less than line-by-line parallelism, though.
Hey, thank you so much for your detailed reply!
I haven't done multi-threading before, so this is my first proper go at it. Having a Channel[Memslice] with just a few Memslices on the Channel and having the threads read the file at their own pace makes a lot more sense than what I'm doing.
I'm currently working on reading the file using memfiles.open(). Can you tell me how to access the lines of the file from within a Memslice? I can't seem to find any documentation on how to extract data from them.
Thank you so much for your time, by the way. I really appreciate it :)
You're welcome. To directly answer your question, see below the let line = $readAttempt.msg. That $ converts the MemSlice to a Nim string.
Below is was my simplification of what you had (with isHeader set up to work on a million replications of your sample input). Note that your Holder concept was going to be a mismash of data from multiple threads written by multiple threads at slightly different times. I just removed it in favor of printing out from the first thread, relying on statistical uniformity of both line break density and processing time:
import os, osproc, strutils, re, threadpool, memfiles
type
Collision = enum BallBall, MineralMineral,
BallMineral, None
proc `$` (collision: Collision): string =
case collision:
of BallBall: "BallBall"
of BallMineral: "BallMineral"
of MineralMineral: "MineralMineral"
else: "None"
proc collisionType(ids: seq[float]): Collision =
if (ids[0] < 700000) and (ids[1] < 700000):
BallBall
elif ((ids[0] < 700000) and (ids[1] > 700000)) or
((ids[0] > 700000) and (ids[1] < 700000)):
BallMineral
elif (ids[0] > 700000) and (ids[1] > 700000):
MineralMineral
else: None # should not happen
proc takeOnly(nums: seq[uint], line: string): seq[float] =
let lineSeq = split(line, ' ')
for num in nums: result.add(lineSeq[num].parseFloat)
var file: MemFile
var fileChan: Channel[MemSlice]
var done = false
var lineCnt, byteCnt: int
proc readPercent: string =
formatFloat(byteCnt / file.size * 100, ffDecimal, 3)
proc fileToChannel =
## Send slices from particles.txt to fileChan
file = memfiles.open("particles.txt")
for ms in file.memSlices:
byteCnt = cast[int](ms.data) -% cast[int](file.mem)
lineCnt.inc
fileChan.send ms
done = true
file.close
proc isHeader(line: string): bool = false
## Checks if the line is a "header"; (intermingled)
# findAll(line, re"-?[\d.]+(?:e-?\d+)?").len < 3 or
# contains(line, re"c_fc+")
proc processReading(c: int) =
const cols01 = @[0'u, 1'u]
var i = 0
while not done:
i.inc
let readAttempt = fileChan.tryRecv()
if readAttempt.dataAvailable:
let line = $readAttempt.msg
if line.isHeader: continue
let cType = collisionType(takeOnly(cols01, line))
if c == 0 and i mod 100000 == 0:
echo "Line ", lineCnt, " (", readPercent(),
"%), Type: ", $cType
when isMainModule:
fileChan.open()
# spawn processReading(0) # over 10x faster
for c in 0..<countProcessors(): spawn processReading(c)
fileToChannel()
sync()
fileChan.close()
With nim-devel and nim c --gc:markAndSweep -d:danger --threads:on it leaks about 0.1% of the file data. As you can tell by swapping the for c in loop with spawn processReading(0), the real time progress is over 10x faster not doing threads. This is that large overhead of line-by-line parallelism I mentioned made concrete. If you are planning to add much more than 10x more work per line, that may not be so bad, otherwise overhead eats your potential parallel speed-up.
Yes, you can also increase work granularity by making the message type array[64, MemSlice]..buffering them up before sending, having a special value like MemSlice(data: nil, size: 0) signify the end of each batch, and doing a nested loop on the processing side.
However, for such large files, as mentioned in my prior post, it would be best to just remove inter-thread communication. Instead, rely upon statistical uniformity of line break density and processing time and create nCPU big chunks. In your context where you have the data in a big file on a random access medium, the only reason not to do this is if order matters, but you also have nothing currently in place to ensure order. So, if order does matter then you already have other problems.
Here..Something like this:
import memfiles # here to `nSplit` maybe~~> own module
proc last*(ms: MemSlice): char =
cast[ptr char](cast[int](ms.data) +% ms.size - 1)[]
proc align(ms: MemSlice): MemSlice =
result = ms
while result.size > 0 and result.last != '\n':
result.size.inc
proc split*(n: int, path: string):
tuple[mf: MemFile; parts: seq[MemSlice]] =
if n < 2: raise newException(ValueError, "n < 2")
let mf = memfiles.open(path)
var parts = newSeq[MemSlice](n)
let step = mf.size div n
parts[0] = align(MemSlice(data: mf.mem, size: step))
for c in 1 ..< max(1, n - 1):
let d = cast[pointer](cast[int](parts[c-1].data) +%
parts[c-1].size)
parts[c] = align(MemSlice(data: d, size: step))
let d = cast[int](parts[^2].data) +% parts[^2].size
parts[^1].data = cast[pointer](d)
parts[^1].size = mf.size - (d -% cast[int](mf.mem))
result = (mf, parts)
type NumberedToks* = tuple[liNo: int; ms: MemSlice]
iterator nSplit*(ms: MemSlice, delim: char): NumberedToks =
proc memchr(s: pointer, c: char, n: csize): pointer
{.importc: "memchr", header:"<string.h>".}
var left = ms.size # assert ms.last == delim
var res: NumberedToks
res.ms.data = cast[pointer](ms.data)
var e = memchr(res.ms.data, '\n', left)
if e == nil: raise newException(IOError, "Bad Format")
res.ms.size = cast[int](e) -% cast[int](res.ms.data)
while e != nil and left > 0:
res.liNo.inc
yield res
left.dec res.ms.size + 1
res.ms.data = cast[pointer](cast[int](res.ms.data) +%
(res.ms.size + 1))
e = memchr(res.ms.data, '\n', left)
res.ms.size = cast[int](e) -% cast[int](res.ms.data)
import osproc, threadpool, strutils #, re
type ##### Application Logic #####
Collision = enum BallBall, MineralMineral, BallMineral,
None
proc `$` (collision: Collision): string =
case collision:
of BallBall: "BallBall"
of BallMineral: "BallMineral"
of MineralMineral: "MineralMineral"
else: "None"
proc collisionType(ids: seq[int]): Collision =
if (ids[0] < 700000) and (ids[1] < 700000): BallBall
elif ((ids[0] < 700000) and (ids[1] > 700000)) or
((ids[0] > 700000) and (ids[1] < 700000)):
BallMineral
elif (ids[0] > 700000) and (ids[1] > 700000):
MineralMineral
else: None # should not happen
proc isHeader(line: string): bool = false
proc doLines(c: int, ms: MemSlice) =
const cols01 = {0'i16, 1'i16}
var ids = newSeq[int](1 + 1) # max cols01 index+1
var line: string # re-used buffer
for i, s in ms.nSplit('\n'):
line.setLen s.size; copyMem line[0].addr,s.data,s.size
if line.isHeader: continue
var j = 0'i16 # populate `ids`
for col in line.split:
if j in cols01: ids[j] = col.parseInt
j.inc # echo c, ",", i, "; ids: ", ids
let cType = ids.collisionType
if c == 0 and i mod 2000000 == 0: # progress report
let did = cast[int](s.data) -% cast[int](ms.data)
let pct = formatFloat(did/ms.size*100, ffDecimal, 2)
echo pct, "%: Type: ", $cType
when isMainModule:
var (mf,parts) = countProcessors().split("particles.txt")
for c, part in parts: spawn doLines(c, part)
sync()
mf.close # not really necessary at end of process
I have only tested it a little & wrote it just for you for this example, but it seems to work fine, should "scale up" well with nCPU and avoids most dynamic memory allocation. I get 50..75 MB/s/CPU, but your mileage may vary. Maybe someone out there can see some easy way to speed it up.
I would re-iterate that care in implementing isHeader (maybe combining it with the j tokenization) should pay dividends. Unlike scripting languages, char-by-char parsing (like proc align here) is not so bad in Nim. Yes, SSE/AVX vectorized memchr can be faster, but regexes are usually much slower.
Also, I changed ids to be seq[int] because parseInt is much faster than parseFloat and because "identity" typically means exact equality. While a float can serve in a pinch for small integers, I think it gives the wrong impression.
Oh dear god, you actually rewrote the entire program. I sure wasn't expecting that, but I really appreciate it and I'm testing it out right now.
I had to do some modifications to the code you wrote because though the IDs for the particles (the first two numbers we are reading right now) are integers, the rest of the line are floats written in scientific notation, even the IDs are written in scientific notation after a certain point, which parseInt can't handle, even if they are whole numbers. The other thing that I had to do was to keep is the isHeader procedure with the regular expressions, because otherwise it's impossible to process the file without skipping the header files, which are all throughout in unpredictable places...
I should've given you an idea of what the file actually looks like:
ITEM: TIMESTEP
5000
ITEM: NUMBER OF ENTRIES
19373
ITEM: BOX BOUNDS ff ff ff
-7 8.23
-6.15 6.15
-6.15 6.15
ITEM: ENTRIES c_fc[1] c_fc[2] c_fc[3] c_fc[4] c_fc[5] c_fc[6] c_fc[7] c_fc[8] c_fc[9] c_fc[10]
4137 177 0 101.829 -22.011 -6.05931 0.000126405 0.000315929 0.000972436 1.32191e-05
177 40 0 -131.062 894.443 -249.542 0.0480773 -0.00876547 -0.0565917 0.000102555
177 52 0 1747.36 647.679 -421.698 0.0238214 -0.224456 -0.245182 0.00016992
177 147 0 -1424.65 -1591.88 -213.59 0.156407 -0.162987 0.174167 0.000172695
4163 101 0 89.0861 308.405 -93.7186 0.0609233 -0.0363505 -0.0614887 7.99238e-05
4163 130 0 -8.23935 -148.497 -162.485 0.0245261 -0.0331942 0.0290742 6.40629e-05
4163 3679 0 -42.8571 -1.44692 -13.4055 -0.00748773 -0.00808038 0.0248318 3.96479e-05
142 123 0 -324.159 -248.417 104.921 -0.0172546 0.00798906 -0.0343471 6.60412e-05
170 80 0 -464.968 -510.257 254.546 -0.0461583 0.0251958 -0.0336903 8.65581e-05
170 169 0 633.359 -386.171 -552.566 0.000807178 0.0528618 -0.0361176 8.56076e-05
9 650 0 56.7877 19.9552 -1.05125 0.00499958 -0.0129203 0.0233127 3.93026e-05
9 527 0 3196.9 -2367.63 567.947 -0.42589 -0.675974 -0.425321 0.000336805
Everything from ITEM: TIMESTEP up to (and including) ITEM: ENTRIES c_fc[0] c_fc[1] ... is a header, and they are all over the file.
The other problem (and this is the big one), is that I do need some sort of global variable to store the result of calculations from the doLines procedure.
What I have to do is add all the forces from into separate variables for each type of collision. For example, lets say we have:
4137 177 0 101.829 -22.011 -6.05931 0.000126405 0.000315929 0.000972436 1.32191e-05
, and that it is a BallBall Collision; every value after the 0 would go be added up into it's own column respective and Collision respective variable. So 101.829 would be added to var ballBall4thCol: float, for example. But this would end up with more data races. So I have no idea how to deal with that, other that maybe having doLines return some data type that holds all of those values, and add the values from the threads after they are done, but I don't know if that's possible :( Ok, Re: ids. Wasn't sure of the format. Kind of icky to do ids in scientific notation if you ask me...but some languages/formats are more type impoverished than Nim. ;-)
Even experienced people overuse regular expressions for simple things constantly. Instead of regular expressions, just test line.len > 0 and line[0] in {'A'..'Z'}. (Looks like you may even be able to say line.startsWith("ITEM") or even faster line[0]=='I'.) { Feel free to try both ways and time the difference. }
Re: the big problem, just make a global seq[] indexed by thread id c. Maybe a seq of some new array/object/tuple type if you need. Total up things in each doLines. When they are all done after the sync, total the subtotals { single-threaded is fine for that since you don't have many CPUs. :-) }. Then, as the French say, et voilà ! ;-)
Also, I only ever said kinda implicitly that you did not have an installation problem. You had an end of loop termination bug where only one thread got told to quit. (The friend/other person may have had only 2 CPUs in a VM or something and avoided that problem. Just a guess.)
Update #2: Aaaaand I broke it again while trying to implement a return value for doLines...
Now I'm getting SIGSEGV: Illegal storage access. (Attempt to read from nil?), and I don't really know why. Am I missing something obvious?
# Malachite.nim
# native imports
import osproc, threadpool, strutils, memfiles
# module imports
import cblakeutils
type
Collision = enum
BallBall, MineralMineral, BallMineral, None
NoneExeption = object of ValueError # Called when a `None` is found
Holder = object
fourth, fifth, sixth, seventh, eighth, ninth, tenth: float
Accumulator = ref object
ballBallHolder, ballMineralHolder, mineralMineralHolder: Holder
proc `$` (collision: Collision): string =
result =
case collision:
of BallBall: "BallBall"
of BallMineral: "BallMineral"
of MineralMineral: "MineralMineral"
else: "None"
proc `$` (accumulator: Accumulator): string =
result =
"The accumulated forces in particles.txt are:\n" &
"\tBallBall Collisions:\n" &
"\t\tc_cfc[4]: " & $accumulator.ballBallHolder.fourth & '\n' &
"\t\tc_cfc[5]: " & $accumulator.ballBallHolder.fifth & '\n' &
"\t\tc_cfc[6]: " & $accumulator.ballBallHolder.sixth & '\n' &
"\t\tc_cfc[7]: " & $accumulator.ballBallHolder.seventh & '\n' &
"\t\tc_cfc[8]: " & $accumulator.ballBallHolder.eighth & '\n' &
"\t\tc_cfc[9]: " & $accumulator.ballBallHolder.ninth & '\n' &
"\t\tc_cfc[10]: " & $accumulator.ballBallHolder.tenth & "\n\n" &
"\tBallMineral Collisions:\n" &
"\t\tc_cfc[4]: " & $accumulator.ballMineralHolder.fourth & '\n' &
"\t\tc_cfc[5]: " & $accumulator.ballMineralHolder.fifth & '\n' &
"\t\tc_cfc[6]: " & $accumulator.ballMineralHolder.sixth & '\n' &
"\t\tc_cfc[7]: " & $accumulator.ballMineralHolder.seventh & '\n' &
"\t\tc_cfc[8]: " & $accumulator.ballMineralHolder.eighth & '\n' &
"\t\tc_cfc[9]: " & $accumulator.ballMineralHolder.ninth & '\n' &
"\t\tc_cfc[10]: " & $accumulator.ballMineralHolder.tenth & "\n\n" &
"\tMineralMineral Collisions:\n" &
"\t\tc_cfc[4]: " & $accumulator.mineralMineralHolder.fourth & '\n' &
"\t\tc_cfc[5]: " & $accumulator.mineralMineralHolder.fifth & '\n' &
"\t\tc_cfc[6]: " & $accumulator.mineralMineralHolder.sixth & '\n' &
"\t\tc_cfc[7]: " & $accumulator.mineralMineralHolder.seventh & '\n' &
"\t\tc_cfc[8]: " & $accumulator.mineralMineralHolder.eighth & '\n' &
"\t\tc_cfc[9]: " & $accumulator.mineralMineralHolder.ninth & '\n' &
"\t\tc_cfc[10]: " & $accumulator.mineralMineralHolder.tenth & "\n\n" &
"Thank you for using Malachite.\n"
proc collisionType(ids: seq[float]): Collision =
## Returns BallBall if there was a ball-to-ball colission,
## returns BallMineral if there was a ball-to-mineral colission,
## returns MineralMineral if there was a mineral-to-mineral colission,
## otherwise, returns None.
result =
if (ids[0] < 700000) and (ids[1] < 700000):
BallBall
elif ((ids[0] < 700000) and (ids[1] > 700000)) or
((ids[0] > 700000) and (ids[1] < 700000)):
BallMineral
elif (ids[0] > 700000) and (ids[1] > 700000):
MineralMineral
else:
None # This case should never happen, but it's here just in case.
proc isHeader(line: string): bool =
## Checks if the line is a "header" line.
## Headers are found all through the particles.txt file.
result =
if line[0] == 'I' or count(line, ' ') < 2: # Count spaces in line, with less than three numbers there'll be less than 2 spaces.
true
else: false
proc doLines(c: int, ms: MemSlice): Accumulator =
echo "test 1"
const
cols01 = {0'i16, 1'i16}
colForces = {3'i16, 4'i16, 5'i16, 6'i16, 7'i16, 8'i16, 9'i16}
var
ids = newSeq[float](2) # max cols01 index+1
forces = newSeq[float](7)
line: string # re-used buffer
echo "test 2"
for i, s in ms.nSplit('\n'):
line.setLen s.size
copyMem line[0].addr, s.data, s.size
if line.isHeader:
continue
var j = 0'i16 # populate `ids`
for col in line.split:
if j in cols01:
ids[j] = col.parseFloat
j.inc
var k = 0'i16 # populate `forces`
for col in line.split:
if k in colForces:
forces[k] = col.parseFloat
k.inc
let cType = ids.collisionType
case cType:
of BallBall:
result.ballBallHolder.fourth += forces[0]
result.ballBallHolder.fifth += forces[1]
result.ballBallHolder.sixth += forces[2]
result.ballBallHolder.seventh += forces[3]
result.ballBallHolder.eighth += forces[4]
result.ballBallHolder.ninth += forces[5]
result.ballBallHolder.tenth += forces[6]
of BallMineral:
result.ballMineralHolder.fourth += forces[0]
result.ballMineralHolder.fifth += forces[1]
result.ballMineralHolder.sixth += forces[2]
result.ballMineralHolder.seventh += forces[3]
result.ballMineralHolder.eighth += forces[4]
result.ballMineralHolder.ninth += forces[5]
result.ballMineralHolder.tenth += forces[6]
of MineralMineral:
result.mineralMineralHolder.fourth += forces[0]
result.mineralMineralHolder.fifth += forces[1]
result.mineralMineralHolder.sixth += forces[2]
result.mineralMineralHolder.seventh += forces[3]
result.mineralMineralHolder.eighth += forces[4]
result.mineralMineralHolder.ninth += forces[5]
result.mineralMineralHolder.tenth += forces[6]
of None:
raise newException(NoneExeption,
"There is something wrong with the collisionType procedure, no collision found!")
if c == 0 and i mod 2000 == 0: # progress report
let
did = cast[int](s.data) -% cast[int](ms.data)
percent = formatFloat(did/ms.size*100, ffDecimal, 2)
echo ($percent, "%: Type", $cType)
proc addAccumulators(accs: seq[FlowVar[Accumulator]]): Accumulator =
for acc in accs:
let a = ^acc
result.ballBallHolder.fourth += a.ballBallHolder.fourth
result.ballBallHolder.fifth += a.ballBallHolder.fifth
result.ballBallHolder.sixth += a.ballBallHolder.sixth
result.ballBallHolder.seventh += a.ballBallHolder.seventh
result.ballBallHolder.eighth += a.ballBallHolder.eighth
result.ballBallHolder.ninth += a.ballBallHolder.ninth
result.ballBallHolder.tenth += a.ballBallHolder.tenth
result.ballMineralHolder.fourth += a.ballMineralHolder.fourth
result.ballMineralHolder.fifth += a.ballMineralHolder.fifth
result.ballMineralHolder.sixth += a.ballMineralHolder.sixth
result.ballMineralHolder.seventh += a.ballMineralHolder.seventh
result.ballMineralHolder.eighth += a.ballMineralHolder.eighth
result.ballMineralHolder.ninth += a.ballMineralHolder.ninth
result.ballMineralHolder.tenth += a.ballMineralHolder.tenth
result.mineralMineralHolder.fourth += a.mineralMineralHolder.fourth
result.mineralMineralHolder.fifth += a.mineralMineralHolder.fifth
result.mineralMineralHolder.sixth += a.mineralMineralHolder.sixth
result.mineralMineralHolder.seventh += a.mineralMineralHolder.seventh
result.mineralMineralHolder.eighth += a.mineralMineralHolder.eighth
result.mineralMineralHolder.ninth += a.mineralMineralHolder.ninth
result.mineralMineralHolder.tenth += a.mineralMineralHolder.tenth
when isMainModule:
var
(mf, parts) = countProcessors().split("particles.txt")
accSeq = newSeq[FlowVar[Accumulator]](countProcessors())
for c, part in parts:
var acc = spawn doLines(c, part)
accSeq[c] = acc
sync()
echo "Done processing!"
echo $addAccumulators(accSeq)
mf.close() # not really necessary at the of of process
Also, do you think I'm doing defining accSeq right? I'm not sure if it's okay or not since I haven't been able to test it yet :(To get it working, you don't need any FlowVar stuff. You can just pass a per-thread ptr to doLines. By design each of these threads is fully independent, just running to completion over its slice. Below is my version, just cut&pasting my suggested module into cblakeutils.nim in the same directory.
Stylistically, if you want to emphasize Nim stdlib vs other imports, I suggest using std/[modA, modB,..], modC which has the added bonus of being more syntactically meaningful than comments. I thought arrays and loops over 4..10 were far superior. I renamed the rather vague Accumulator to ForceTotal and tightened up some formatting to help keep the Forum post size smaller. Also, I just inlined the logic of isHeader into the parsing so you do not need to do 3 passes over each line (one for count, one for ids and one for forces). Also made ids and forces into array not seq since they are compile-time known sizes. Also, your "non data type" related "thanks for using" is a non-idiomatic $. So, I moved that to the echo and also fixed your echo (..) which was formatting a tuple, not the varargs. {You could also delete the space between echo and the first (. }
It obviously depends upon how many CPU cores/threads you are using, but for 4 threads and 19 GiB, I get like 200 progress reports with 500_000. Your 2_000 may be so small as to actually slow things down with progress output, especially if the output is to a slow terminal emulator.
import std/[osproc, threadpool, strutils, memfiles, os,
strformat], cblakeutils
type
Collision = enum
BallBall, MineralMineral, BallMineral, None
NoneExeption = object of ValueError # `None` found
ForceTotal = object
ballBall, ballMineral, mineralMineral:
array[range[4..10], float]
proc `$` (collision: Collision): string =
result =
case collision:
of BallBall: "BallBall"
of BallMineral: "BallMineral"
of MineralMineral: "MineralMineral"
else: "None"
proc `$` (a: ForceTotal): string =
result = "Accumulated forces in particles.txt are:\n" &
"\tBallBall Collisions:\n"
for j in 4..10:
result.add &"\t\tc_cfc[{j}]: " & $a.ballBall[j] & '\n'
result.add "\tBallMineral Collisions:\n"
for j in 4..10: # Wrap lines to block Forum H.ScrollBar
result.add &"\t\tc_cfc[{j}]: " &
$a.ballMineral[j] & '\n'
result.add "\tMineralMineral Collisions:\n"
for j in 4..10:
result.add &"\t\tc_cfc[{j}]: " &
$a.mineralMineral[j] & '\n'
proc collisionType(ids: array[2, float]): Collision =
## Returns `Collision` kind, including `None`.
result =
if (ids[0] < 700000) and (ids[1] < 700000): BallBall
elif ((ids[0] < 700000) and (ids[1] > 700000)) or
((ids[0] > 700000) and (ids[1] < 700000)):
BallMineral
elif (ids[0] > 700000) and (ids[1] > 700000):
MineralMineral
else: None # Should not happen
proc doLines(acc: ptr ForceTotal, c: int, ms: MemSlice) =
const
colIds = {0'i8, 1'i8}
colForces = {3'i8, 4'i8, 5'i8, 6'i8, 7'i8, 8'i8, 9'i8}
var
ids: array[2, float] # particle ids
forces: array[4..10, float] # cfc forces
line: string # re-used buffer
for i, s in ms.nSplit('\n'):
line.setLen s.size
copyMem line[0].addr, s.data, s.size
if line.len < 1 or line[0] == 'I': # header line
continue
var j = 0'i8 # fill ids & forces
for col in line.split:
if j in colIds : ids[j] = col.parseFloat
elif j in colForces: forces[j+1] = col.parseFloat
j.inc
if j < 10: continue # header line
let cType = ids.collisionType
case cType:
of BallBall:
for k in 4..10: acc.ballBall[k] += forces[k]
of BallMineral:
for k in 4..10: acc.ballMineral[k] += forces[k]
of MineralMineral:
for k in 4..10: acc.mineralMineral[k] += forces[k]
of None:
raise newException(NoneExeption, "No Collision type")
if c == 0 and i mod 500_000 == 0: # progress report
let
did = cast[int](s.data) -% cast[int](ms.data)
pct = formatFloat(did/ms.size*100, ffDecimal, 2)
echo $pct, "%: Type ", $cType
proc total(accs: seq[ForceTotal]): ForceTotal =
for a in accs:
for j in 4..10:
result.ballBall[j] += a.ballBall[j]
for j in 4..10:
result.ballMineral[j] += a.ballMineral[j]
for j in 4..10:
result.mineralMineral[j] += a.mineralMineral[j]
when isMainModule:
let nThr0 = parseInt(getEnv("NT", "0"))
let nThr = if nThr0 == 0: countProcessors() else: nThr0
var (mf, parts) = nThr.split("particles.txt")
var accs = newSeq[ForceTotal](nThr)
for c, part in parts:
spawn doLines(accs[c].addr, c, part)
sync()
echo $accs.total, "\nThank you for using Malachite."
# mf.close() # not so helpful at the end of process
I would expect this problem to be IO bound for >= ~4 CPU cores. With nim c -d:danger --gc:markAndSweep --threads:on, on a crappy 12 year old Intel i5 M540@2.53GHz with only 1.75 GiB RAM and no swap and an Intel SSD, I just processed 2**24 copies of your more complete 21 line test input or 19344130048 bytes in 105 seconds. wc -l took 70 seconds on the same input & machine { and constructing the input file took several minutes }. On a fast desktop it took 26 seconds and on a bigger 32-core CPU it ran in just 6.5 seconds..hardly even in need of progress reports, TBH.
Needless to say, but FORTRAN taking days instead of seconds for 32 GiB on a big machine sounds like some pretty bad programming, even single threaded..something like 10,000X too slow. There may still be bugs in mine or something, but I doubt any of that needs to alter the basic performance much from what you have so far indicated of the computation.
And you should definitely construct a small test case that hits all the branches/kinds of collision and confirm the output of various versions of the program on that/use it for debugging.
Also, I profiled that program with the Linux perf and like 41% of the time was in nimParseBiggestFloat & nsuParseFloat. Nim's stdlib impl is slow but correct for edge-case inputs. If that 26sec or 6.5s or whatever the fully tested & debugged version winds up being on your "big computer" is still too slow, you may be able to get it down to ~65% of that time by taking code from this PR and dumping it into cblakeutils.nim under parseFloatFast or some such (65% not 59% because ASCII->binary does take some time).
It sounds from the run-times that you've been dealing with that this is unlikely to be a problem. I mention it for completeness and under a theory that it may help you more in some other circumstance where number parsing takes an even bigger time fraction. A best answer in this space is usually to "parse just once" and save the answer in a binary file that can be just mmap()d and used as-is, not re-parse over and over again. (You can have a little debugging utility to print the binary file or something.)
It only takes 21 minutes to process the 32GB file! Admittedly, counting up the values of the columns isn't the calculation I need to do, but I can work on my own from there :)
Now I have to port the functions for the calculations from the FORTRAN program and I'd be done.
I do have an issue with collisionType, but that one is macro related, so I'll start a new thread for that one.
Thank you so much, you helped me a lot :)
For what it's worth, 21 minutes sounds too slow (or rather 32000/21/60 = 25 MB/s sounds too slow). Were you careful to compile with -d:danger? It is possible, of course, that the data is off some slow IO device | contended network filesystem | ...
Also, if it helps, I added some better tested in edge cases file & slice partitioning utilities to cligen with a similar but distinct API to what we discussed here.
cligen/examples/linect.nim has a simple line counting example that doubles as a way to show both how to use the API and how to assess how well dividing by bytes and scanning to the next record works at creating "even" partitions by the task sets represented by each line (or elsewise delimited variable length record).
If you don't mind CLI options instead of a config file, cligen may be another solution to your not-macro-related user-input problem. E.g., the program could take --ball-ranges & --mineral-ranges as CLI options. The most natural thing there would be to define your own argParse/argHelp to parse & print ranges, though arguably ranges may be common enough I could maybe add them to cligen/argcvt.
Oh god, yeah... I totally forgot about -d:danger, now it only takes 2 minutes!
I'm planning on using a simple GTK ui to get input from the user (apparently command lines are to scary and complicated...)