IA014: Advanced Functional Programming 10. I/O and Concurrency Jan Obdržálek obdrzalek@fi.muni.cz Faculty of Informatics, Masaryk University Brno IA014 10. I/O and Concurrency 1 Monadic I/O IA014 10. I/O and Concurrency 2 Pure programs and I/O • pure functional program (e.g. in Haskell) implements a function • no side effects • but the purpose of a program is to produce a side effect: • produce an output, send a message, modify a screen ... • the side effects have to be outside of the program IA014 10. I/O and Concurrency 3 Monadic I/O A value of type 10 a is an "action" (computation) that may do some input/output before producing a value of type a. The meaning of "do some input/output": modify the outside world type 10 a = World -> (a, World) result:: a World in World out IA014 10. I/O and Concurrency 4 The 10 monad return :: a -> 10 a a a h T return (>> = ) : : 10 a -> (a -> 10 b) -> 10 b Char getChar ► putChar getChar >>= putChar (>>) : : 10 a -> 10 b -> 10 b (>>) a b = a >>= (\x -> b) do notation can be used IA014 10. I/O and Concurrency Program as an I/O action In Haskell • the whole program defines a single big I/O action. • its type is 10 (), and • the program is executed by performing the action. Example: main :: 10 () main = do xs <- getLine putStrLn (reverse xs) The world is treated in a single-threaded way: • >>= is the only operation for combining I/O actions • the world is never duplicated or thrown away • (and the programmer cannot break this) IA014 10. I/O and Concurrency 6 Control structures 1/2 Monadic I/O lets us do imperative programming, including control structures. Examples: an infinite loop forever :: 10 () -> 10 () forever a = a » forever a • n-times loop repeatN :: Int -> 10 a -> 10 () repeatN 0 a = return () repeatN n a = a » repeatN (n-1) a Why we can do this: actions as first class values. IA014 10. I/O and Concurrency 7 Control structures 2/2 Users are free to define other kinds of control structures. Examples Let us execute a sequence of actions: sequence_ :: [10 a] -> 10 () sequence_ [] = return () sequence- [x:xs] = do x; sequence- xs Now we can define f o r as: for :: [a] -> (a -> 10 ()) -> 10 () for ns f = sequence- (map f ns) Exercise: • define while :: 10 Bool -> 10 () IA014 10. I/O and Concurrency 8 References Goal: to create mutable variables Idea: 10 operations provide us with sequentialized input/output data IORef a newIORef :: a -> 10 (IORef a) readlORef :: IORef a -> 10 a writelORef :: IORef a -> a -> 10 () Example: import Data.IORef main = do varA <- newIORef 0 a0 <- readlORef varA writelORef varA 1 al <- readlORef varA print (a0, al) Arrays, hashtables etc. are implemented in the same way IA014 10. I/O and Concurrency 9 Concurrency IA014 10. I/O and Concurrency 10 Concurrency vs parallelism Parallelism • multiple processors/cores to gain performance • no communication between processes • no semantic impact (same result if executed sequentially and in parallel) • deterministic Concurrency • concurrency as part of specification • concurrent threads, each doing I/O independently • behaviour is non-deterministic • substantial semantic impact We will use Concurrent Haskell (an extension of Haskell 2010) IA014 10. I/O and Concurrency 11 Threads • new threads are created using forklO forklO :: 10 () -> 10 ThreadID • newly created thread runs concurrently with other threads • side effects are interleaved with other threads • in GHC threads are extremely lightweight (< 100 bytes plus stack) • Haskell threads are movable - eliminates fragmentation • GHC automatically spread threads among cores IA014 10. I/O and Concurrency 12 Threads example - interleaving import Control.Concurrent import Control.Monad import System.10 main = do hSetBuffering stdout NoBuffering forklO (forever(putChar 'A')) forklO (forever(putChar 'B')) threadDelay (10"3) threadDeiay :: int -> io o suspends execution for a given number of microseconds. One possible output: AABBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBB BBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBB BBBBBBBBBBBBBBBBBBBBBBBBBBBABABABABABAB IA014 10. I/O and Concurrency Low level communication: MVar MVar is very similar to IORef: data MVar a newEmptyMVar :: 10 (MVar a) newMVar :: a -> 10 (MVar a) takeMVar :: MVar a -> 10 a putMVar :: MVar a -> a -> 10 () readMVar :: MVar a -> 10 a • MVar can hold a value or be empty • takeMVar removes a value and blocks if MVar is empty (until MVar becomes full) • putMVa r inserts a value and blocks if MVa r is full (until MVar becomes empty) • waiting threads are in a FIFO queue • only one thread is woken up at a time IA014 10. I/O and Concurrency 14 MVar example import Control.Concurrent import System.Random main :: 10 () main = do var <- newMVar [] -- MVar to collect the results mapM_ (forklO . task var) ["first", "second", "third"] -putStrLn "Press Return to show the results." _ <- getLine takeMVar var »= mapM_ putStrLn -- print the results where task v s = do randomRIO (1,10) »= \r -> threadDelay (r * 10000) val <- takeMVar v putMVar v (s : val) IA014 10. I/O and Concurrency 15 Various uses of MVar • locks ' type MVar () is sufficient • takeMVar acquires the lock, putMVar releases or the other way round - just be consistent • one-place channels can be used for asynchronous communication • containers for shared mutable states IA014 10. I/O and Concurrency 16 Asynchronous I/O import Control.Concurrent import Data.ByteString as B import GetURL main = do ml <- newEmptyMVar m2 <- newEmptyMVar forklO $ do r <- getURL "http://www.wikipedia.org/wiki/Shove putMVar ml r forklO $ do r <- getURL "http://www.wikipedia.org/wiki/Spade putMVar m2 r rl <- takeMVar ml r2 <- takeMVar m2 print (B.length rl, B.length r2) IA014 10. I/O and Concurrency The a sync package data Async a = Async (MVar a) async :: 10 a -> 10 (Async a) async action = do var <- newEmptyMVar forklO (do r <- action; putMVar var r) return (Async var) wait :: Async a -> 10 a wait (Async var) = readMVar var Our code can now be written as: main = do al <- async (getURL "http://www.wikipedia.org/wiki/Shovel") a2 <- async (getURL "http://www.wikipedia.org/wiki/Spade") rl <- wait al r2 <- wait a2 print (B.length rl, B.length r2) IA014 10. I/O and Concurrency 18 Channels MVars can be used as a building block to construct larger abstractions. Case study: Channels Channel - unlimited buffer for communicating between processes. Interface: data Chan a newChan :: 10 (Chan a) readChan :: Chan a -> 10 a writeChan :: Chan a -> a -> 10 () IA014 10. I/O and Concurrency 19 Channel implementation Channel it - r □ B Item Item Item Read end B>[|0>B>[^0>&l^D Write end First value Channel contents: Second value Third value type Stream a data Item a MVar (Item a) Item a (Stream a) Channel data type: data Chan a = Chan (MVar (Stream a)) (MVar (Stream a) (we have pointers to the first and last element) IA014 10. I/O and Concurrency Channel access newChan :: 10 (Chan a) newChan = do hole <- newEmptyMVar readVar <- newMVar hole writeVar <- newMVar hole return (Chan readVar writeVar writeChan :: Chan a -> a -> 10 () writeChan (Chan _ writeVar) val = do newHole <- newEmptyMVar oldHole <- takeMVar writeVar putMVar oldHole (Item val newHole) putMVar writeVar newHole readChan :: Chan a -> 10 a readChan (Chan readVar _) = do stream <- takeMVar readVar Item val tail <- takeMVar stream putMVar readVar tail return val IA014 10. I/O and Concurrency 21 Software Transactional Memory (STM) IA014 10. I/O and Concurrency 22 Motivation Problems with lock-based approaches • races (missing some locks) • deadlocks (wrong order of locks) • too conservative use (inhibits concurrency) • simple typos • locks do not compose well (no support for modular programming) IA014 10. I/O and Concurrency 23 Atomic memory transactions • alternative to locking • inspiration in the database world (transactions, ACID) • mark a block of code which should be atomic atomically • whole block or nothing - Atomicity • execute blocks independently - Isolation • no lock induced deadlocks (no locks!) • easy error recovery Such approaches are known as Software Transactional Memory (STM) IA014 10. I/O and Concurrency 24 How to implement STM? One possible way: • execute the atomic block without any locking (optimistic synchronization) • record every memory read and write • writes are simulated on side • after the block execution is finished, validate the transaction • check, that every read variable has the same value as was recorded in the log • if valid, commit the changes to variables • if not, re-run the atomic block IA014 10. I/O and Concurrency 25 Simplistic approach to atomically "That's what 10 is for..." atomically :: 10 a -> 10 a Sample use: main = do r <- newIORef 0 fork (atomically (incRef r)) atomically (incRef r) where incRef var = do v <- readlORef var writelORef var (v+1) Problems: • What if we forget to add the atomically wrapper? • I/O actions do not work well in this model: atomically (if n>k then launch_missiles) IA014 10. I/O and Concurrency 26 The STM monad • introduces "tracked" imperative variables (TVa (also known as "transactional variables") • the monad then "tracks" the transactions data STM a :: * -> * atomically :: STM a -> 10 a ret ry : : STM a orElse :: STM a -> STM a -> STM a check :: Bool -> STM () data TVar a newTVar :: a -> STM (TVar a) readTVar :: TVar a -> STM a writeTVar :: TVar a -> a -> STM () IA014 10. I/O and Concurrency Example: banking account type Account = TVar Int withdraw :: Account -> Int -> STM () withdraw acc amount = do bal <- readTVar acc writeTVar acc (bal - amount) deposit :: Account -> Int -> STM () deposit acc amount = withdraw acc (- amount) Type system guarantees that TVars cannot be used outside transactions: bad :: Account -> 10 () bad acc = do hPutStr stdout "Withdrawing, withdraw acc 10 good :: Account -> 10 () good acc = do hPutStr stdout "Withdrawing atomically (withdraw acc 10 IA014 10. I/O and Concurrency 28 Blocking: retry What if there are not sufficient funds? limitedWithdraw :: Account -> Int -> STM () limitedWithdraw acc amount = do bal <- readTVar acc if amount > 0 && amount > bal then retry else writeTVar acc (bal - amount) retry semantics: • current transaction is abandoned and retried at some later time (retrying immediately makes no sense) • efficient implementation would wait for some write to acc (Why acc? Easily detected in the transaction log!) IA014 10. I/O and Concurrency 29 Check & retry The pattern we used on the previous slide is very common: Check a boolean condition, and retry if not satisfied. check :: Bool -> STM () check True = return () check False = retry Now we can rewrite limitedWithdrawal in a more concise way: limitedWithdraw :: Account -> Int -> STM () limitedWithdraw acc amount = do bal <- readTVar acc check (amount <= 0 || amount <= bal) writeTVar acc (bal - amount) IA014 10. I/O and Concurrency 30 Trying alternative ways: orElse What if we have two accounts, and, if the first one does not have sufficient funds, want to make withdrawal from the second? The perfect job for orElse :: STM a -> STM a -> STM a! limitedWithdraw2 :: Account -> Account -> Int -> STM () limitedWithdraw2 accl acc2 amt = orElse (limitedWithdraw accl amt) (limitedWithdraw acc2 amt) The semantics of orElse al a2: • tries to perform al • if al retries, tries to perform a2 • if a2 retries, the whole action retries IA014 10. I/O and Concurrency 31 It's Christmas! The Santa Claus problem Santa repeatedly sleeps until wakened by either all of his nine reindeer, back from their holidays, or by a group of three of his ten elves. If awakened by the reindeer, he harnesses each of them to his sleigh, delivers toys with them and finally unharnesses them (allowing them to go off on holiday). If awakened by a group of elves, he shows each of the group into his study, consults with them on toy R&D and finally shows them each out (allowing them to go back to work). Santa should give priority to the reindeer in the case that there is both a group of elves and a group of reindeer waiting. Trono, 1994 IA014 10. I/O and Concurrency 33 Reading • S. Peyton Jones: Tackling the Awkward Squad: monadic input/output, concurrency, exceptions, and foreign-language calls in Haskell. Marktoberdorf 2001 • S. Peyton Jones: Beautiful Concurrency. Chapter in Beautiful Code, 2007. • S. Marlow: Parallel and Concurrent Programming in Haskell. 2012. • 10 inside. Haskellwiki. https://www.haskell.org/haskellwiki/10_inside Original papers: • T. Harris, S. Marlow, S. Peyton Jones, M. Herlihy: Composable Memory Transactions. PPoPP'05. IA014 10. I/O and Concurrency 34