Haskell Concurrency
1. Hello Threads
In haskell, an MVar is a key building block for concurrency. All interactions between threads happen in the IO monad. To compile it, we will use -O2 and -threaded flags. When running the program, we run it with +RTS -N8 to use 8 threads.
Creating a Haskell Thread
forkIO :: IO () -> IO ThreadIdfromControl.Concurrentcreates a new lightweight thread to run the givenIOaction.
1import Control.Concurrent 2import Control.Monad 3import System.IO 4 5putChars :: Char -> Int -> IO () 6putChars c 0 = return () 7putChars c n = do 8 putChar c 9 putChars c (n - 1) 10 11main = do 12 hSetBuffering stdout NoBuffering -- Disable output buffering 13 forkIO $ putChars 'a' 100000 14 forkIO $ putChars 'b' 100000 15 forkIO $ putChars 'c' 100000 16 return ()
2.1 Joining with MVar
If we don't join the threads the main thread can finish before the other threads:
1import Control.Concurrent 2import Control.Monad 3import System.IO 4 5main = do 6 hSetBuffering stdout NoBuffering 7 forkIO $ print "Thread 1" 8 forkIO $ print "Thread 2" 9 return ()
MVar
A
MVarin haskell is a mutable variable. This is either empty or full. This is different from maybe, as it defines mutable state. Operations onMVarinclude:
newMVar :: a -> IO (MVar a)creates a newMVarcontaining the given value.newEmptyMVar :: IO (MVar a)creates a new emptyMVar.takeMVar :: MVar a -> IO ablocks untilMVaris full then removes and returns the value.putMVar :: MVar a -> a -> IO ()blocks untilMVaris empty then puts the value into it.readMVar :: MVar a -> IO ablocks untilMVaris full then returns the value without removing it.All operations on
MVarare atomic.
Lets use an MVar to do a thread join.
1import Control.Concurrent 2import Control.Monad 3import System.IO 4 5printThenJoin :: String -> MVar () -> IO () 6printThenJoin s handle = do 7 print s 8 putMVar handle () -- Thread is done 9 10main = do 11 hSetBuffering stdout NoBuffering 12 handle1 <- newEmptyMVar 13 handle2 <- newEmptyMVar 14 forkIO $ printThenJoin "I am thread 1" handle1 15 forkIO $ printThenJoin "I am thread 2" handle2 16 takeMVar handle1 17 takeMVar handle2 18 putStrLn "Both threads done" 19 return ()
2.2 Mutexes with MVar
MVars can be used for mutual exclusion:
1thread :: ... -> MVar () ... -> IO () 2thread ... mutex ... = do 3 ... 4 putMVar mutex () -- Lock 5 -- Critical section 6 takeMVar mutex -- Unlock 7 8main = do 9 mutex <- newEmptyMVar 10 forkIO $ thread ... mutex ... 11 forkIO $ thread ... mutex ... 12 ...
1import Control.Concurrent 2import Control.Monad 3import System.IO 4 5protectedPrint :: String -> MVar () -> MVar () -> IO () 6protectedPrint s mutex handle = do 7 putMVar mutex () -- Lock 8 print s 9 takeMVar mutex -- Unlock 10 putMVar handle () -- Signal done 11 12main = do 13 hSetBuffering stdout NoBuffering 14 mutex <- newEmptyMVar 15 handle1 <- newEmptyMVar 16 handle2 <- newEmptyMVar 17 forkIO $ protectedPrint "I am thread 1" mutex handle1 18 forkIO $ protectedPrint "I am thread 2" mutex handle2 19 takeMVar handle1 20 takeMVar handle2 21 putStrLn "Both threads done" 22 return ()
2.3 Producer-Consumer with MVar
1sumChannel :: MVar Int -> Int -> IO Int 2sumChannel c 0 = return 0 3sumChannel c n = do 4 x <- takeMVar c 5 rest <- sumChannel c (n - 1) 6 return (x + rest) 7 8sendToConsumers :: MVar Int -> Int -> IO () 9sendToConsumers p2c 0 = return () 10sendToConsumers p2c n = do 11 putMVar p2c n 12 sendToConsumers p2c (n - 1) 13 14producer :: MVar Int -> MVar () -> Int -> Int -> MVar Int -> IO () 15producer p2c c2p numElemsToProduce numConsumers finalResult = do 16 sendToConsumers p2c numElemsToProduce 17 result <- sumChannel c2p numConsumers 18 putMVar finalResult result 19 20consumer :: MVar Int -> MVar Int -> Int -> IO () 21consumer p2c c2p numElemsToConsume = do 22 myresult <- sumChannel p2c numElemsToConsume 23 putMVar c2p myresult 24 25main = do 26 p2c <- newEmptyMVar :: IO (MVar Int) 27 c2p <- newEmptyMVar :: IO (MVar ()) 28 producerHandle <- newEmptyMVar :: IO (MVar Int) 29 forkIO $ producer p2c c2p 200 2 producerHandle 30 forkIO $ consumer p2c c2p 100 31 forkIO $ consumer p2c c2p 100 32 result <- takeMVar producerHandle 33 print result 34 return ()
Replicating a Monad
To replicate a monadic action
ntimes, we can usereplicateMfromControl.Monad:replicateM :: Monad m => Int -> m a -> m [a]runs the given monadic actionntimes and collects the results in a list.For example, to take from an MVar 10 times in a row:
replicateM 10 (takeMVar myMVar) :: IO [Int]
2.4 Unbounded Channels
A channel can be thought of as a linked list of MVars. Each node contains a value and a pointer to the next node. A channel has a read end and a write end. Consumers read from the read end and producers write to the write end. It is structured as:
- Read End:
MVar(points to the first stream) - Write End:
MVar(points to the last stream)
Then:
- Stream -
MVar(points to the next node) - Item - A value and a pointer to the next stream
- Stream -
MVar(points to the next node) - Item - A value and a pointer to the next stream
- ...
- Item - A value and a pointer to the next stream
- Stream -
MVar(points to the next node)
To implement it:
1data Channel a = Channel (MVar (Stream a)) (MVar (Stream a)) 2 3type Stream a = MVar (Item a) 4 5data Item a = Item a (Stream a) 6 7 8newChannel :: IO (Channel a) 9newChannel = do 10 emptyStream <- newEmptyMVar 11 readEnd <- newMVar emptyStream 12 writeEnd <- newMVar emptyStream 13 return (Channel readEnd writeEnd) 14 15readChannel :: Channel a -> IO a 16readChannel (Channel readEnd _) = do 17 readEndStream <- takeMVar readEnd 18 (Item value remainder) <- takeMVar readEndStream 19 putMVar readEnd remainder 20 return value 21 22writeChannel :: Channel a -> a -> IO () 23writeChannel (Channel _ writeEnd) value = do 24 newEmptyStream <- newEmptyMVar 25 writeEndStream <- takeMVar writeEnd 26 putMVar writeEndStream (Item value newEmptyStream) 27 putMVar writeEnd newEmptyStream 28