Homework 8
package mipt.homework8
import scala.concurrent.{ExecutionContext, Future}
import mipt.utils.Homeworks._
object Task1 {
def foldF[A, B](in: Seq[Future[A]], zero: B, op: (B, A) => B)(
implicit executionContext: ExecutionContext
): Future[B] =
Реализуйте функцию, которая выполнит свертку (fold) входящей последовательности из Future,
используя переданный комбинатор и начальное значение для свертки.
Если какая-либо из исходных Future зафейлилась, то должна вернуться ошибка от нее
""" (1, 1)
def flatFoldF[A, B](in: Seq[Future[A]], zero: B, op: (B, A) => Future[B])(
implicit executionContext: ExecutionContext
): Future[B] =
Реализуйте функцию, которая выполнит свертку (fold) входящей последовательности из Future,
используя переданный асинхронный комбинатор и начальное значение для свертки.
Если какая-либо из исходных Future зафейлилась, то должна вернуться ошибка от нее.
Если комбинатор зафейлился, то должна вернуться ошибка от него.
""" (1, 2)
def fullSequence[A](futures: List[Future[A]])(
implicit ex: ExecutionContext
): Future[(List[A], List[Throwable])] =
В данном задании Вам предлагается реализовать функцию fullSequence,
похожую на Future.sequence, но в отличии от нее,
возвращающую все успешные и не успешные результаты.
Возвращаемое тип функции - кортеж из двух списков,
в левом хранятся результаты успешных выполнений,
в правово результаты неуспешных выполнений.
Не допускается использование методов объекта Await и мутабельных переменных var
""" (1, 3)
def traverse[A, B](in: List[A])(fn: A => Future[B])(
implicit ex: ExecutionContext
): Future[List[B]] =
Реализуйте traverse c помощью метода Future.sequence
""" (1, 4)
def mapReduce[A, B, B1 >: B](in: List[A], map: A => Future[B], reduce: (B1, B1) => B1)(
implicit ex: ExecutionContext
): Future[B1] =
Реализуйте алгоритм map/reduce.
Исходный список обрабатывается параллельно (конкурентно) с помощью применения функции map к каждому элементу
Результаты работы функции map должны быть свернуты в одно значение функцией reduce
Если в ходе выполнения какой-либо операции возникло исключение - эту обработку нужно игнорировать
Если ни один вызов map не завершился успешно, вернуть зафейленную фьючу с исключением UnsupportedOperationException
""" (1, 5)
Normal file
Normal file
@ -0,0 +1,181 @@
package mipt.homework8
import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers
import mipt.homework8.Task1._
import java.util.concurrent.atomic.AtomicInteger
import scala.concurrent.{Await, ExecutionContext, Future}
import scala.concurrent.duration._
class Task1Spec extends AnyFlatSpec with Matchers {
"foldF" should "fold list of futures into a single future using combining function" in new WithGlobalExecutionContext {
val input = Seq(1, 2, 3, 4, 5, 6).map(Future.successful)
await(foldF[Int, Int](input, 0, _ + _)) shouldBe 21
await(foldF[Int, Int](input, 1, _ * _)) shouldBe 720
"flatFoldF" should "fold list of futures into a single future using async combining function" in new WithGlobalExecutionContext {
val input = Seq(1, 2, 3, 4, 5, 6).map(Future.successful)
val sum = (a: Int, b: Int) => Future.successful(a + b)
val product = (a: Int, b: Int) => Future.successful(a * b)
await(flatFoldF(input, 0, sum)) shouldBe 21
await(flatFoldF(input, 1, product)) shouldBe 720
"full sequence" should "process list of success futures" in new WithLimitedExecutionContext {
* best answer will process task with 9 runnable
* good answer will process task with 12 runnable
* satisfied answer will process task with any number of runnable
* choose which one you want
* */
val limit = 100
implicit val exec: ExecutionContext = limitedExec(limit)
val fut1 = fut(1)
val fut2 = fut(2)
val fut3 = fut(3)
assert(await(fullSequence[Int](List(fut1, fut2, fut3))) === (List(1, 2, 3), List()))
it should "process list of success and failures" in new WithLimitedExecutionContext {
* best answer will process task with 7 runnable
* good answer will process task with 8 runnable
* satisfied answer will process task with any number of runnable
* choose which one you want
* */
val limit = 100
implicit val exec: ExecutionContext = limitedExec(limit)
val ex1 = new Exception("ex1")
val ex2 = new Exception("ex2")
val failed1 = Future.failed(ex1)
val failed2 = Future.failed(ex2)
val fut1 = fut(1)
assert(await(fullSequence[Int](List(fut1, failed1, failed2))) === (List(1), List(ex1, ex2)))
it should "process list of failures" in new WithLimitedExecutionContext {
* best answer will process task with 4 runnable
* satisfied answer will process task with any number of runnable
* choose which one you want
* */
val limit = 100
implicit val exec: ExecutionContext = limitedExec(limit)
val ex1 = new Exception("ex1")
val ex2 = new Exception("ex2")
val failed1 = Future.failed(ex1)
val failed2 = Future.failed(ex2)
assert(await(fullSequence[Int](List(failed1, failed2))) === (List(), List(ex1, ex2)))
"traverse via sequence" should "behave as a scala Future.traverse" in new WithGlobalExecutionContext {
val xs = (1 to 10).toList
val result = await(traverse(xs)(fut))
assert(result === await(Future.traverse(xs)(fut)))
assert(result === xs)
it should "work with empty lists" in new WithGlobalExecutionContext {
assert(await(traverse(Nil)(fut)) === Nil)
it should "correctly stop on failures" in new WithGlobalExecutionContext {
case class MyError() extends Exception
val xs = (1 to 100).toList
try {
await(traverse(xs)(v => {
if (v == 42) throw MyError();
} catch {
case MyError() => assert(true)
"mapReduce" should "asynchronously map and reduce the list and skip failures" in new WithGlobalExecutionContext {
val xs = (1 to 10).toList
val predicate: Int => Boolean = a => a == 5 || a == 6
val map: Int => Future[Int] = a =>
if (predicate(a))
Future.failed(new RuntimeException)
else Future.successful(a * 2)
val reduce: (Int, Int) => Int = _ + _
await(mapReduce(xs, map, reduce)) shouldBe xs.filterNot(predicate).map(_ * 2).sum
it should "throw UnsupportedOperationException if all elements mapping is failed" in new WithGlobalExecutionContext {
val xs = (1 to 10).toList
val map: Int => Future[Int] = _ => Future.failed(new RuntimeException)
val reduce: (Int, Int) => Int = _ + _
try {
await(mapReduce(xs, map, reduce))
} catch {
case _: UnsupportedOperationException => assert(true)
case _: Throwable => assert(false)
def await[A](future: Future[A]): A = Await.result(future, Duration.Inf)
def fut(i: Int)(implicit ex: ExecutionContext): Future[Int] = Future {
trait WithLimitedExecutionContext {
def limitedExec(limit: Int): ExecutionContext = new ExecutionContext {
val counter = new AtomicInteger(0)
val global: ExecutionContext = scala.concurrent.ExecutionContext.Implicits.global
override def execute(runnable: Runnable): Unit = {
if (counter.get() > limit) {
throw new Exception("Runnable limit reached, You can do better :)")
} else {
override def reportFailure(cause: Throwable): Unit = ???
trait WithGlobalExecutionContext {
implicit val executionContext: ExecutionContext = scala.concurrent.ExecutionContext.global
