Hello!
I think this is probably a simple answer, but I've been banging on it for too long now. I want to process the lines of a file in parallel. The R code I am trying to emulate is:
library(parallel)
library(data.table)
file <- commandArgs(trailingOnly=T)
df <- fread('cat /dev/stdin', header=F, sep="\t")
totals <- mclapply(df, function(x) {
sum(grepl("bc", x, ignore.case=T))
}, mc.cores=ncol(df))
print(sum(unlist(totals))
Here is my Nim code:
import strutils, sequtils, threadpool
{.experimental: "parallel".}
func countBC(line:string): int =
for val in line.split('\t'):
if "bc" in toLower(val[1..<4]):
inc(result)
proc main() =
var count = 0
var lines = stdin.readAll.splitLines[0 .. ^2]
var counts = newSeqOfCap[int](lines.len)
parallel:
for i in 0 .. lines.high:
counts[i] = spawn countBC(lines[i])
echo foldl(counts, a + b, 0)
main()
The error I get is count_lines_parallel_nim.nim(18, 20) Error: cannot prove: i <= len(counts) + -1 (bounds check)
What do I need to do to let the compiler know that counts has the same length as lines?
Maybe try:
parallel:
for i in 0 .. min(counts.high, lines.high):
counts[i] = spawn countBC(lines[i])
A better bounds checker is coming soon (?)
Thank you @dom96 and @Araq! Both of your methods work! Unsurprisingly, since my actual function operating on each line doesn't do that much, building up a buffer to send to the treadpool is much faster.
import strutils, sequtils, threadpool, cpuinfo
{.experimental: "parallel".}
#[
Nim single threaded method takes about 4.5s.
R parallel takes about 3.5s
]#
#[ Method 2: use spawn only and chunk the input
Time 1.101s
]#
func countBC2(chunk: string): int =
for line in chunk.splitLines():
for val in line.split('\t'):
if "bc" in toLower(val[1..<4]):
inc(result)
proc main2() =
var chunkSize = 1_000_000
var responses = newSeq[FlowVar[int]]()
var buffer = newString(chunksize)
var oldBufferLen = 0
while not endOfFile(stdin):
let reqSize = chunksize - oldBufferLen
let readSize = stdin.readChars(buffer, oldBufferLen, reqSize) + oldBufferLen
var chunkLen = readSize
while chunkLen >= 0 and buffer[chunkLen - 1] notin NewLines:
chunkLen.dec
responses.add(spawn countBC2(buffer[0 .. <chunkLen]))
oldBufferLen = readSize - chunkLen
buffer[0 .. <oldBufferLen] = buffer[readSize - oldBufferLen .. ^1]
var total = 0
for resp in responses:
total += ^resp
echo total
#[ Method 1: use parallel block
Time: 17.51
]#
func countBC(line: string): int =
for val in line.split('\t'):
if "bc" in toLower(val[1..<4]):
inc(result)
proc main() =
var lines = newSeq[string]()
var counts = newSeq[int]()
for line in stdin.lines:
lines.add(line)
counts.add(0)
parallel:
for i in 0 .. min(counts.high, lines.high):
counts[i] = spawn countBC(lines[i])
echo foldl(counts, a + b, 0)
when isMainModule:
# main()
main2()
Thank you both for your help! I am a new to Nim and loving it!