c********w 发帖数: 308 | 1 需要做个简单的Java工具,input(从Unix pipeline 来) 是一千多万行binary msg.
每行被decode 成一个json msg.然后apply 一个filter 最后输出所有matching json
data.
如果decoder 和 filter 已经很优化了。有什么方法或者工具可以提高处理速度?关键
是量太大了。。。 |
s***o 发帖数: 2191 | 2 parellel stream
or
去jobhunting版问好虫(jobhuntinger) :)
.
【在 c********w 的大作中提到】 : 需要做个简单的Java工具,input(从Unix pipeline 来) 是一千多万行binary msg. : 每行被decode 成一个json msg.然后apply 一个filter 最后输出所有matching json : data. : 如果decoder 和 filter 已经很优化了。有什么方法或者工具可以提高处理速度?关键 : 是量太大了。。。
|
m****o 发帖数: 182 | 3 这是典型的stream programming,直接上rxjava就好了。
.
【在 c********w 的大作中提到】 : 需要做个简单的Java工具,input(从Unix pipeline 来) 是一千多万行binary msg. : 每行被decode 成一个json msg.然后apply 一个filter 最后输出所有matching json : data. : 如果decoder 和 filter 已经很优化了。有什么方法或者工具可以提高处理速度?关键 : 是量太大了。。。
|
m****o 发帖数: 182 | 4 parallel stream不太方便控制线程池,大批量处理数据不是很推荐。
【在 s***o 的大作中提到】 : parellel stream : or : 去jobhunting版问好虫(jobhuntinger) :) : : .
|
c*********e 发帖数: 16335 | 5 node.js 啊
.
【在 c********w 的大作中提到】 : 需要做个简单的Java工具,input(从Unix pipeline 来) 是一千多万行binary msg. : 每行被decode 成一个json msg.然后apply 一个filter 最后输出所有matching json : data. : 如果decoder 和 filter 已经很优化了。有什么方法或者工具可以提高处理速度?关键 : 是量太大了。。。
|
h**********c 发帖数: 4120 | 6 一千多万行binary msg.
binary msg 分行,怎么encode的?
从优化角度讲,存到数据库会比加瓦快,数据库对硬盘优化,有自带的并行功能。 |
h**********c 发帖数: 4120 | 7 这种优化主要是pagination,从而提高 locaclity, improve cache hit rate
【在 h**********c 的大作中提到】 : 一千多万行binary msg. : binary msg 分行,怎么encode的? : 从优化角度讲,存到数据库会比加瓦快,数据库对硬盘优化,有自带的并行功能。
|
c********w 发帖数: 308 | 8 binary msg是从一个messaging bus query来的,然后pipe进一个简单的Java程序来
decode 和filter. 需要一个quick solution.
目前不需要数据库。node js看了下好像也有点overkill.主要我也不熟悉。不知道如何
快速入手。
rxjava似乎更符合一个quick solution的要求。就是embed java library. 上手应该快
。它是不是主要就是一个pattern,进一步抽象了concurrency编程。thread pool 好像
可以grow as needed. |
m****o 发帖数: 182 | 9 rxjava需要实现多线程处理的话,需要把每个操作flatmap到一个observable上去,然
后subscribeOn一个scheduler,你的情况可以直接上库自带的computational
scheduler。注意flatmap是不考虑输入输出顺序统一的,这一点很其他主流fp库设计不
同容易搞错。如果需要输入输出顺序统一,需要用concatmap。
另外好像你不是很在乎使用全局线程池,那样的话可以直接用前面有人提到的java8原
生的parallelMap。
val in = new BufferedReader(new InputStreamReader(System.in))
try {
val stream = in.lines()
try stream.parallelMap(data => ???).filter(data => ???).forEach{/* here is
your sink */}
finally stream.close()
}
finally {
in.close()
} |
d***a 发帖数: 13752 | 10 输入的行与行之间,在decode和filter时有依赖性吗?
.
【在 c********w 的大作中提到】 : 需要做个简单的Java工具,input(从Unix pipeline 来) 是一千多万行binary msg. : 每行被decode 成一个json msg.然后apply 一个filter 最后输出所有matching json : data. : 如果decoder 和 filter 已经很优化了。有什么方法或者工具可以提高处理速度?关键 : 是量太大了。。。
|
|
|
c********w 发帖数: 308 | 11 每个msg之间没有联系。所以可以并行处理
【在 d***a 的大作中提到】 : 输入的行与行之间,在decode和filter时有依赖性吗? : : .
|
d***a 发帖数: 13752 | 12 那就简单,写一个简单的pipe分流程序,把一个pipe的数据用round robin的方式写到N
个pipe里,后者的每个pipe再输入到一个单独运行的decoder & filter进程。在N核的
机器上,应该能有接近N倍的加速比。
【在 c********w 的大作中提到】 : 每个msg之间没有联系。所以可以并行处理
|
c*********e 发帖数: 16335 | 13 你是用的multi-threading还是async ?
到N
【在 d***a 的大作中提到】 : 那就简单,写一个简单的pipe分流程序,把一个pipe的数据用round robin的方式写到N : 个pipe里,后者的每个pipe再输入到一个单独运行的decoder & filter进程。在N核的 : 机器上,应该能有接近N倍的加速比。
|
c********w 发帖数: 308 | 14 他这个就是multi process吧。比起multi-threading 用一个decoder process.
【在 c*********e 的大作中提到】 : 你是用的multi-threading还是async ? : : 到N
|
d***a 发帖数: 13752 | 15 对,这个是multi-process的做法。用Java multi-threading也可以,但async的做法感
觉不好。
【在 c********w 的大作中提到】 : 他这个就是multi process吧。比起multi-threading 用一个decoder process.
|
h**********c 发帖数: 4120 | 16 我就想问二进制,换行怎么标示的,行中间碰巧有换行符号,不就不自洽了吗? |
h**********c 发帖数: 4120 | 17 说到底,这个framework那个framework,没有CPU benchmark都是扯淡 |
x***4 发帖数: 1815 | 18 正解。根本就不知道楼主的bottle在哪里。先来个profile吧。
【在 h**********c 的大作中提到】 : 说到底,这个framework那个framework,没有CPU benchmark都是扯淡
|
d***a 发帖数: 13752 | 19 如果Unix pipe的前端是数据文件,每个二进制记录的长度固定,那有更简单的做法。
用Unix的split命令把数据文件分成N份,然后启动N个进程处理相应的数据文件,最后
再把N个输出文件合并,就行了。可以手动,或者可以写一个bash或Python脚本,
20分钟就能搞定。
【在 d***a 的大作中提到】 : 对,这个是multi-process的做法。用Java multi-threading也可以,但async的做法感 : 觉不好。
|
h**********c 发帖数: 4120 | 20 这个题说老实话,应该是老魏擅长,单机,暂时不分布,
应该搞DMA,kernel bypass.
原题给的也不是很清楚,几千行是连续的还是断续的 |
|
|
h**********c 发帖数: 4120 | 21 一个好的profiler,现在基本在market上找不到。
叔也不好再多说了,但不说也不行,不能眼睁睁看车开沟里去。尤其我们自己也在车上。
【在 x***4 的大作中提到】 : 正解。根本就不知道楼主的bottle在哪里。先来个profile吧。
|
c********w 发帖数: 308 | 22 没想到JAVA multithreading这么慢。试了executorService threadpool,一个thread是
最快的。thread越多越慢。就算是线程数和processor数一样也没用。试了Java8
stream. parallel stream 比stream慢好多。刚开始觉得是我code的问题。可是我就算
是用简单的计算来代替真正的decoder,还是parallel, multithreading slower.。。。
最快的是用non parallel Stream.并行就是不行. 最后只能再试试multi processing.
profile 过了,没有明显的bottle neck. |
s***o 发帖数: 2191 | 23 感觉不太对啊,这个数据量parallel应该有帮助。input是读file还是都在memory里面
了?
。。
【在 c********w 的大作中提到】 : 没想到JAVA multithreading这么慢。试了executorService threadpool,一个thread是 : 最快的。thread越多越慢。就算是线程数和processor数一样也没用。试了Java8 : stream. parallel stream 比stream慢好多。刚开始觉得是我code的问题。可是我就算 : 是用简单的计算来代替真正的decoder,还是parallel, multithreading slower.。。。 : 最快的是用non parallel Stream.并行就是不行. 最后只能再试试multi processing. : profile 过了,没有明显的bottle neck.
|
w**z 发帖数: 8232 | 24 并行反而慢,那就或者是 Io,或者是 memory 的问题了。 |
d***a 发帖数: 13752 | 25 可能程序或运行环境有问题,需要针对并行性能来调一下。数据量足够大了,并且没有
计算依赖性,这是非常适合并行的情况。
多进程运行的做法,只是为了短平快,这种特殊情况下可以一下子搞定,但性能上不会
比自己写并行程序更快(在最好情况下)。
。。
【在 c********w 的大作中提到】 : 没想到JAVA multithreading这么慢。试了executorService threadpool,一个thread是 : 最快的。thread越多越慢。就算是线程数和processor数一样也没用。试了Java8 : stream. parallel stream 比stream慢好多。刚开始觉得是我code的问题。可是我就算 : 是用简单的计算来代替真正的decoder,还是parallel, multithreading slower.。。。 : 最快的是用non parallel Stream.并行就是不行. 最后只能再试试multi processing. : profile 过了,没有明显的bottle neck.
|
c********w 发帖数: 308 | 26 加了Xmx和concurMarkAndSweep..多线程还是不行。网上查了下这种情况也不少,可以
是运算不够复杂,不足以掩盖threading overhead...
现在多进程的做的差不多了,就是你前面说的python break up input into multiple
chunks 然后启动多个Java进程并行处理,最后write to separate files. Python再合
并成一个。效果不错。估计就只能这样了。不知道怎么再调多线程了。。
【在 d***a 的大作中提到】 : 可能程序或运行环境有问题,需要针对并行性能来调一下。数据量足够大了,并且没有 : 计算依赖性,这是非常适合并行的情况。 : 多进程运行的做法,只是为了短平快,这种特殊情况下可以一下子搞定,但性能上不会 : 比自己写并行程序更快(在最好情况下)。 : : 。。
|
c*********e 发帖数: 16335 | 27 你用的啥计算机啊?几个core? 每个内存多大?
。。
【在 c********w 的大作中提到】 : 没想到JAVA multithreading这么慢。试了executorService threadpool,一个thread是 : 最快的。thread越多越慢。就算是线程数和processor数一样也没用。试了Java8 : stream. parallel stream 比stream慢好多。刚开始觉得是我code的问题。可是我就算 : 是用简单的计算来代替真正的decoder,还是parallel, multithreading slower.。。。 : 最快的是用non parallel Stream.并行就是不行. 最后只能再试试multi processing. : profile 过了,没有明显的bottle neck.
|