java 如何使用多线程处理本地磁盘中存储的大量文件(使用文件锁)
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/1442720/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
how to use multiple threads to process large number of files stored in the local disk ( using file lock)
提问by paxdiablo
how to use multiple threads in java to process large number of files stored in the local disk directory ( using file lock)
java中如何使用多线程处理本地磁盘目录下的大量文件(使用文件锁)
回答by gustafc
You don't want to read the files in parallell (disk I/O doesn't parallelize well). Better then to let a single thread read the files, send the contents off to worker threads for parallel processing, and then collect the results from the workers. Using the excellent ExecutorService& c:o from java.util.concurrentspares you the dirty details of threading and makes your solution far more flexible.
您不想并行读取文件(磁盘 I/O 不能很好地并行化)。最好让单个线程读取文件,将内容发送到工作线程进行并行处理,然后从工作线程收集结果。使用优秀的ExecutorService& c:o fromjava.util.concurrent可以省去线程的肮脏细节,并使您的解决方案更加灵活。
Here's a simple example. Assuming Foois the result of processing a file:
这是一个简单的例子。假设Foo是处理文件的结果:
public List<Foo> processFiles(Iterable<File> files){
List<Future<Foo>> futures = new ArrayList<Future<Foo>>();
ExecutorService exec = Executors.newFixedThreadPool(
Runtime.getRuntime().availableProcessors());
for (File f : files){
final byte[] bytes = readAllBytes(f); // defined elsewhere
futures.add(exec.submit(new Callable<Foo>(){
public Foo call(){
InputStream in = new ByteArrayInputStream(bytes);
// Read a Foo object from "in" and return it
}
}));
}
List<Foo> foos = new List<Foo>(futures.size());
for (Future<Foo> f : futures) foos.add(f.get());
exec.shutdown();
return foos;
}
TODO: Add exception handling etc. You may also want to instantiate the ExecutorServiceoutside of processFilesso you can reuse it between calls.
TODO:添加异常处理等。您可能还想实例化ExecutorService外部,processFiles以便您可以在调用之间重用它。
回答by paxdiablo
The best way I know of doing it (in any language, not just Java) is to use a producer/multi-consumer paradigm.
我所知道的最好的方法(使用任何语言,而不仅仅是 Java)是使用生产者/多消费者范式。
Have one thread create a queue then start up Nother threads. This main thread will then enumerate all the files and place their names on that queue. Then it will place Nend-of-queue markers on the queue.
让一个线程创建一个队列,然后启动N其他线程。然后,该主线程将枚举所有文件并将它们的名称放在该队列中。然后它将N在队列上放置队列结束标记。
The "other" threads simply read the next name off that queue and process the file. When they read off a end-of-queue marker, they exit (and the main thread can reap their exit status if need be).
“其他”线程只是从该队列中读取下一个名称并处理该文件。当它们读取队列结束标记时,它们退出(如果需要,主线程可以获取它们的退出状态)。
This simplifies the communication between threads to the queue (which should, of course, be protected by a mutex so as to not cause race conditions with all the threads). It also allows the threads to control their own exit condition (under direction from the main thread), another good way to to avoid certain multi-threading problems.
这简化了线程与队列之间的通信(当然,队列应该受互斥锁保护,以免引起所有线程的竞争条件)。它还允许线程控制自己的退出条件(在主线程的指导下),这是避免某些多线程问题的另一种好方法。
回答by Chad Okere
Here's how I usually do it.
这是我通常的做法。
You can create a blocking Queue like this:
您可以像这样创建一个阻塞队列:
LinkedBlockingQueue<String> files;
files = new LinkedBlockingQueue<String>(1000);
AtomicBoolean done = new AtomicBoolean(false);
The queue can only hold 1000 elements, so if you some how have a billion files or whatever, you don't have to worry about running out of memory. You can change the size to whatever you want based on how much memory you want to take up.
队列只能容纳 1000 个元素,所以如果你有十亿个文件或其他什么,你不必担心内存不足。您可以根据要占用的内存量将大小更改为任何您想要的大小。
In your main thread you do something like:
在您的主线程中,您执行以下操作:
File directory = new File("path\to\folder");
for(File file : directory.listFiles()){
files.put(file.getAbsolutePath());
}
files.put(null);//this last entry tells the worker threads to stop
The put function blocks until space becomes available in the queue, so if you fill up the files will stop reading. Of course, because File.listFiles() actually returns an array, rather then a Collection that doesn't need to be loaded entirely into memory, you still end up to loading a complete list of files into memory if you use this function. If that ends up being a problem, I guess you'll have to do something else.
put 函数会阻塞,直到队列中有可用空间,因此如果您填满文件,将停止读取。当然,因为 File.listFiles() 实际上返回的是一个数组,而不是一个不需要完全加载到内存中的 Collection,如果你使用这个函数,你最终仍然会加载一个完整的文件列表到内存中。如果这最终成为一个问题,我想你将不得不做其他事情。
But this model also works if you have some other method of listing files (for example if they're all in a database, or whatever) Just replace the call to directory.listFiles() with whatever you use to get your file list. Also, if you have to process files in sub directories, you'll have to go through them recursively, which can be annoying (but this gets around the memory issue for extreemly large directories)
但是,如果您有其他一些列出文件的方法(例如,如果它们都在数据库中,或者其他任何方法),则此模型也适用。只需将对 directory.listFiles() 的调用替换为用于获取文件列表的任何内容。此外,如果您必须处理子目录中的文件,则必须递归地遍历它们,这可能很烦人(但这可以解决超大目录的内存问题)
then in your worker threads:
然后在您的工作线程中:
public void run(){
while(!done.get()){
String filename = files.take();
if(filename != null){
//do stuff with your file.
}
else{
done.set(true);//signal to the other threads that we found the final element.
}
}
}
If all the files in the queue have been processed, take will wait until new elements show up.
如果队列中的所有文件都已处理,take 将等待直到新元素出现。
That's the basic idea anyway, this code is off the top of my head and hasn't been tested exactly as is.
无论如何,这就是基本的想法,这段代码超出了我的脑海,还没有完全按照原样进行测试。
回答by VHS
With Java 8, you can easily achieve this using parallel streams. See the following code snippet:
使用 Java 8,您可以使用parallel streams. 请参阅以下代码片段:
try {
Files.walk(Paths.get("some-path")).parallel().forEach(file -> {/*do your processing*/});
} catch (IOException e1) {
e1.printStackTrace();
}
With parallel stream, the run time will spawn the required number of threads, not exceeding the number of CPU logical cores, to process the collection elements, files in our case, in parallel. You can also control the number of threads by passing it as a JVM argument.
对于并行流,运行时将产生所需数量的线程,不超过 CPU 逻辑核心的数量,以并行处理集合元素,在我们的例子中是文件。您还可以通过将其作为 JVM 参数传递来控制线程数。
The advantage of this approach is that you don't have to really do any low level work of creating and maintaining threads. You just focus on your high level problem.
这种方法的优点是您不必真正做任何创建和维护线程的低级工作。您只需专注于您的高级问题。
回答by Amarjeet
I am working on similar problem where I have to process few thousands text files. I have a file poller which polls the directory and prepares list of files found in the directory(including sub-directories), and calls a method, say, fileFound with the list as an argument.
我正在处理类似的问题,我必须处理几千个文本文件。我有一个文件轮询器,它轮询目录并准备在目录(包括子目录)中找到的文件列表,并调用一个方法,例如,以列表作为参数的 fileFound。
In fileFound method, I'm iterating over the list and creating a new thread for each files. I'm using ExecutorService to control the number of active threads. Code goes like this:
在 fileFound 方法中,我遍历列表并为每个文件创建一个新线程。我正在使用 ExecutorService 来控制活动线程的数量。代码是这样的:
public void fileFound(List<File> fileList) {
for (File file : fileList) {
FileProcessor fprocessor = new FileProcessor(file); // run() method takes care of implementing business rules for the file.
EXECUTOR.submit(fprocessor); //ExecutorService EXECUTOR = Executors.newFixedThreadPool(10);
}
}
My observation:
我的观察:
- When processing files one by one, without multi-threading, processing 3.5K files(~32GB total), it took ~9 hours.
Using multi-threading:
When number of threads fixed to 5 - 118 minutes.
When number of threads fixed to 10 - 75 minutes.
When number of threads fixed to 15 - 72 minutes.
- 逐个处理文件时,没有多线程,处理 3.5K 文件(总共约 32GB),耗时约 9 小时。
使用多线程:
当线程数固定为 5 - 118 分钟时。
当线程数固定为 10 - 75 分钟时。
当线程数固定为 15 - 72 分钟时。
回答by dj_segfault
A lot of the leg work has been done for you in the Java Concurrency classes. You probably want something like ConcurrentLinkedQueue.
在 Java Concurrency 类中已经为您完成了大量的工作。你可能想要像ConcurrentLinkedQueue这样的东西。
An unbounded thread-safe queue based on linked nodes. This queue orders elements FIFO (first-in-first-out). The head of the queue is that element that has been on the queue the longest time. The tail of the queue is that element that has been on the queue the shortest time. New elements are inserted at the tail of the queue, and the queue retrieval operations obtain elements at the head of the queue. A ConcurrentLinkedQueue is an appropriate choice when many threads will share access to a common collection.
基于链接节点的无界线程安全队列。此队列对元素 FIFO(先进先出)进行排序。队列的头部是在队列中停留时间最长的那个元素。队列的尾部是在队列中停留时间最短的那个元素。新元素插入队列尾部,队列检索操作获取队列头部元素。当许多线程将共享对公共集合的访问时, ConcurrentLinkedQueue 是合适的选择。
You use the offer() method to put entries on the queue, either in the main thread or a separate thread. Then you have a bunch of worker bees (ideally created in something like ExecutorService) that use the poll() method to pull the next entry out of the queue and process it.
您可以使用 offer() 方法将条目放在队列中,无论是在主线程中还是在单独的线程中。然后你有一堆工蜂(最好是在类似ExecutorService 的东西中创建 ),它们使用 poll() 方法从队列中拉出下一个条目并处理它。
Using this design gives you incredible flexibility in determining how many producers and how many consumers run concurrently, without having to do any waiting/polling code yourself. You can create your pool of minions using Executors.newFixedThreadPool().
使用这种设计,您可以非常灵活地确定有多少生产者和多少消费者同时运行,而无需自己执行任何等待/轮询代码。您可以使用 Executors.newFixedThreadPool() 创建您的 Minion 池。
回答by Tim Bender
What you really want to do is have your main program traverse the directory getting Filereferences. Use those references to create an object which implements Runnable. The run()method of the Runnable is all of your processing logic. Create an ExecutorServiceand call execute(Runnable) to submit the tasks to the executor service. The Executor will run the tasks ask threads become available based on the type of Executor you create (Executors.newFixedThreadPool() is a good choice. When your main thread has found all of the files and submitted them as tasks, you want to call shutdown()on the Executor and then call [awaitTermination()][6]. Calling shutdown() tells the executor to finish executing the tasks it was given and then close, calling awaitTermination() causes your main thread to block until the Executor shuts down. That of course assumed you want to wait for all tasks to finish and then do more processing.
您真正想要做的是让您的主程序遍历目录以获取文件引用。使用这些引用创建一个实现Runnable的对象。Runnable的run()方法是您所有的处理逻辑。创建一个ExecutorService并调用 execute(Runnable) 将任务提交给 executor 服务。Executor 将运行任务,根据您创建的 Executor 的类型询问线程是否可用(Executors.newFixedThreadPool() 是一个不错的选择。当您的主线程找到所有文件并将它们作为任务提交时,您想调用shutdown ()在 Executor 上,然后调用 [awaitTermination()][6]。调用 shutdown() 告诉 executor 完成执行它给定的任务然后关闭,调用 awaitTermination() 会导致您的主线程阻塞,直到 Executor 关闭。当然,假设您想等待所有任务完成,然后进行更多处理。
[6]: http://java.sun.com/javase/6/docs/api/java/util/concurrent/ExecutorService.html#awaitTermination(long, java.util.concurrent.TimeUnit)
[6]:http: //java.sun.com/javase/6/docs/api/java/util/concurrent/ExecutorService.html#awaitTermination(long, java.util.concurrent.TimeUnit)

