fix
This commit is contained in:
parent
6232d7dee9
commit
c1f58a27e1
|
@ -8,7 +8,7 @@ import scala.reflect.ClassTag
|
||||||
|
|
||||||
object Task1 {
|
object Task1 {
|
||||||
|
|
||||||
def parTraverse[A, B: ClassTag](parallelism: Int)(xs: List[A])(fa: A => Future[B])(
|
def parTraverse[A, B](parallelism: Int)(xs: List[A])(fa: A => Future[B])(
|
||||||
implicit ex: ExecutionContext
|
implicit ex: ExecutionContext
|
||||||
): Future[List[B]] = {
|
): Future[List[B]] = {
|
||||||
if (xs.isEmpty) {
|
if (xs.isEmpty) {
|
||||||
|
@ -24,7 +24,7 @@ object Task1 {
|
||||||
case scala.util.Success(value) =>
|
case scala.util.Success(value) =>
|
||||||
results(index) = Some(value)
|
results(index) = Some(value)
|
||||||
if (counter.incrementAndGet() == xs.size) {
|
if (counter.incrementAndGet() == xs.size) {
|
||||||
promise.success(results.flatten.toList)
|
promise.success(results.toList.flatten)
|
||||||
} else {
|
} else {
|
||||||
process(index + parallelism)
|
process(index + parallelism)
|
||||||
}
|
}
|
||||||
|
|
|
@ -2,7 +2,7 @@ package mipt.homework9
|
||||||
|
|
||||||
import mipt.utils.Homeworks._
|
import mipt.utils.Homeworks._
|
||||||
|
|
||||||
import java.util.concurrent.atomic.AtomicBoolean
|
import java.util.concurrent.Semaphore
|
||||||
import scala.concurrent.ExecutionContext.Implicits.global
|
import scala.concurrent.ExecutionContext.Implicits.global
|
||||||
import scala.concurrent.Future
|
import scala.concurrent.Future
|
||||||
|
|
||||||
|
@ -14,29 +14,13 @@ object Task3 {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
object EventLoopManager {
|
private val guard_runner = new Semaphore(1)
|
||||||
private val queue = new java.util.concurrent.ConcurrentLinkedQueue[(() => Any, Any => Unit)]()
|
|
||||||
private val isProcessing = new AtomicBoolean(false)
|
|
||||||
|
|
||||||
def addToQueue(task: (() => Any, Any => Unit)): Unit = {
|
|
||||||
queue.add(task)
|
|
||||||
|
|
||||||
if (isProcessing.compareAndSet(false, true)) {
|
|
||||||
processQueue()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private def processQueue(): Unit = Future {
|
|
||||||
while (!queue.isEmpty) {
|
|
||||||
val (task, callback) = queue.poll()
|
|
||||||
val result = task()
|
|
||||||
callback(result)
|
|
||||||
}
|
|
||||||
isProcessing.set(false)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
def eventLoop[A, B](f: A => B): EventLoopFunction[A, B] = (a: A, cb: B => Unit) => {
|
def eventLoop[A, B](f: A => B): EventLoopFunction[A, B] = (a: A, cb: B => Unit) => {
|
||||||
EventLoopManager.addToQueue((() => f(a), cb.asInstanceOf[Any => Unit]))
|
guard_runner.acquire()
|
||||||
|
Future(f(a)).onComplete[Unit](x => {
|
||||||
|
x.map(cb)
|
||||||
|
guard_runner.release()
|
||||||
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue