Possibly fixes #438

This commit is contained in:
Jesse Boyd 2017-03-04 18:22:11 +11:00
parent cb2f9ebf11
commit 5ead47ba2f
No known key found for this signature in database
GPG Key ID: 59F1DE6293AF6E1F
10 changed files with 55 additions and 34 deletions

View File

@ -247,8 +247,8 @@ public class BukkitQueue_1_10 extends BukkitQueue_0<net.minecraft.server.v1_10_R
}
@Override
public boolean next(int amount, ExecutorCompletionService pool, long time) {
return super.next(amount, pool, time);
public boolean next(int amount, long time) {
return super.next(amount, time);
}
@Override

View File

@ -247,8 +247,8 @@ public class BukkitQueue_1_11 extends BukkitQueue_0<net.minecraft.server.v1_11_R
}
@Override
public boolean next(int amount, ExecutorCompletionService pool, long time) {
return super.next(amount, pool, time);
public boolean next(int amount, long time) {
return super.next(amount, time);
}
@Override

View File

@ -11,6 +11,9 @@ import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
public class DefaultFaweQueueMap implements IFaweQueueMap {
@ -116,7 +119,7 @@ public class DefaultFaweQueueMap implements IFaweQueueMap {
private int lastZ = Integer.MIN_VALUE;
@Override
public boolean next(int amount, ExecutorCompletionService pool, long time) {
public boolean next(int amount, long time) {
synchronized (blocks) {
try {
boolean skip = parent.getStage() == SetQueue.QueueStage.INACTIVE;
@ -139,19 +142,21 @@ public class DefaultFaweQueueMap implements IFaweQueueMap {
}
} while (System.currentTimeMillis() - start < time);
} else {
ExecutorCompletionService service = SetQueue.IMP.getCompleterService();
ForkJoinPool pool = SetQueue.IMP.getForkJoinPool();
boolean result = true;
// amount = 8;
for (int i = 0; i < amount && (result = iter.hasNext()); i++, added++) {
for (int i = 0; i < amount && (result = iter.hasNext()); i++) {
Map.Entry<Long, FaweChunk> item = iter.next();
FaweChunk chunk = item.getValue();
if (skip && chunk == lastWrappedChunk) {
i--;
added--;
continue;
}
iter.remove();
parent.start(chunk);
pool.submit(chunk);
service.submit(chunk);
added++;
}
// if result, then submitted = amount
if (result) {
@ -165,14 +170,19 @@ public class DefaultFaweQueueMap implements IFaweQueueMap {
}
iter.remove();
parent.start(chunk);
pool.submit(chunk);
FaweChunk fc = ((FaweChunk) pool.take().get());
parent.end(fc);
service.submit(chunk);
Future future = service.poll(50, TimeUnit.MILLISECONDS);
if (future != null) {
FaweChunk fc = (FaweChunk) future.get();
parent.end(fc);
}
}
}
}
for (int i = 0; i < added; i++) {
FaweChunk fc = ((FaweChunk) pool.take().get());
pool.awaitQuiescence(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
Future future;
while ((future = service.poll()) != null) {
FaweChunk fc = (FaweChunk) future.get();
parent.end(fc);
}
}

View File

@ -3,7 +3,6 @@ package com.boydti.fawe.example;
import com.boydti.fawe.object.FaweChunk;
import com.boydti.fawe.object.RunnableVal;
import java.util.Collection;
import java.util.concurrent.ExecutorCompletionService;
public interface IFaweQueueMap {
@ -21,5 +20,5 @@ public interface IFaweQueueMap {
int size();
boolean next(int size, ExecutorCompletionService dispatcher, long time);
boolean next(int size, long time);
}

View File

@ -21,7 +21,6 @@ import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collection;
import java.util.UUID;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.TimeUnit;
@ -213,8 +212,8 @@ public abstract class MappedFaweQueue<WORLD, CHUNK, CHUNKSECTIONS, SECTION> exte
}
@Override
public boolean next(int amount, ExecutorCompletionService pool, long time) {
return map.next(amount, pool, time);
public boolean next(int amount, long time) {
return map.next(amount, time);
}
public void start(FaweChunk chunk) {

View File

@ -14,6 +14,9 @@ import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
public class WeakFaweQueueMap implements IFaweQueueMap {
@ -149,7 +152,7 @@ public class WeakFaweQueueMap implements IFaweQueueMap {
private int lastZ = Integer.MIN_VALUE;
@Override
public boolean next(int amount, ExecutorCompletionService pool, long time) {
public boolean next(int amount, long time) {
synchronized (blocks) {
try {
boolean skip = parent.getStage() == SetQueue.QueueStage.INACTIVE;
@ -179,6 +182,8 @@ public class WeakFaweQueueMap implements IFaweQueueMap {
} while (System.currentTimeMillis() - start < time);
return !blocks.isEmpty();
}
ExecutorCompletionService service = SetQueue.IMP.getCompleterService();
ForkJoinPool pool = SetQueue.IMP.getForkJoinPool();
boolean result = true;
// amount = 8;
for (int i = 0; i < amount && (result = iter.hasNext());) {
@ -191,7 +196,7 @@ public class WeakFaweQueueMap implements IFaweQueueMap {
iter.remove();
if (chunk != null) {
parent.start(chunk);
pool.submit(chunk);
service.submit(chunk);
added++;
i++;
} else {
@ -212,15 +217,20 @@ public class WeakFaweQueueMap implements IFaweQueueMap {
iter.remove();
if (chunk != null) {
parent.start(chunk);
pool.submit(chunk);
FaweChunk fc = ((FaweChunk) pool.take().get());
parent.end(fc);
service.submit(chunk);
Future future = service.poll(50, TimeUnit.MILLISECONDS);
if (future != null) {
FaweChunk fc = (FaweChunk) future.get();
parent.end(fc);
}
}
}
}
}
for (int i = 0; i < added; i++) {
FaweChunk fc = ((FaweChunk) pool.take().get());
pool.awaitQuiescence(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
Future future;
while ((future = service.poll()) != null) {
FaweChunk fc = (FaweChunk) future.get();
parent.end(fc);
}
} catch (Throwable e) {

View File

@ -14,7 +14,6 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorCompletionService;
public class MCAQueueMap implements IFaweQueueMap {
@ -150,7 +149,7 @@ public class MCAQueueMap implements IFaweQueueMap {
}
@Override
public boolean next(int size, ExecutorCompletionService dispatcher, long time) {
public boolean next(int size, long time) {
lastX = Integer.MIN_VALUE;
lastZ = Integer.MIN_VALUE;
lastFileX = Integer.MIN_VALUE;

View File

@ -314,14 +314,14 @@ public abstract class FaweQueue implements HasFaweQueue {
int amount = Settings.IMP.QUEUE.PARALLEL_THREADS;
ExecutorCompletionService service = SetQueue.IMP.getCompleterService();
long time = 20; // 30ms
return next(amount, service, time);
return next(amount, time);
}
/**
* Gets the FaweChunk and sets the requested blocks
* @return
*/
public abstract boolean next(int amount, ExecutorCompletionService pool, long time);
public abstract boolean next(int amount, long time);
public void saveMemory() {
MainUtil.sendAdmin(BBC.OOM.s());

View File

@ -19,7 +19,6 @@ import java.util.Collection;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ExecutorCompletionService;
import javax.annotation.Nullable;
public class DelegateFaweQueue extends FaweQueue {
@ -232,8 +231,8 @@ public class DelegateFaweQueue extends FaweQueue {
}
@Override
public boolean next(int amount, ExecutorCompletionService pool, long time) {
return parent.next(amount, pool, time);
public boolean next(int amount, long time) {
return parent.next(amount, time);
}
@Override

View File

@ -54,6 +54,11 @@ public class SetQueue {
return completer;
}
@Deprecated
public ForkJoinPool getForkJoinPool() {
return pool;
}
public void runMiscTasks() {
while (Fawe.get().getTimer().isAbove(targetTPS)) {
Runnable task = tasks.poll();
@ -116,7 +121,7 @@ public class SetQueue {
boolean parallel = Settings.IMP.QUEUE.PARALLEL_THREADS > 1;
queue.startSet(parallel);
try {
if (!queue.next(Settings.IMP.QUEUE.PARALLEL_THREADS, getCompleterService(), time) && queue.getStage() == QueueStage.ACTIVE) {
if (!queue.next(Settings.IMP.QUEUE.PARALLEL_THREADS, time) && queue.getStage() == QueueStage.ACTIVE) {
queue.setStage(QueueStage.NONE);
queue.runTasks();
}
@ -216,7 +221,7 @@ public class SetQueue {
public void flush(FaweQueue queue) {
queue.startSet(Settings.IMP.QUEUE.PARALLEL_THREADS > 1);
try {
queue.next(Settings.IMP.QUEUE.PARALLEL_THREADS, getCompleterService(), Long.MAX_VALUE);
queue.next(Settings.IMP.QUEUE.PARALLEL_THREADS, Long.MAX_VALUE);
} catch (Throwable e) {
pool.awaitQuiescence(Settings.IMP.QUEUE.DISCARD_AFTER_MS, TimeUnit.MILLISECONDS);
completer = new ExecutorCompletionService(pool);