Massive Information processing - Concurrency and Fault-Tolerance: The Google Approach

The informational needs of our society are enormous and growing. Data is now in the peta and exa bytes and computing needs in the tera, peta, and even exa flops. [For a general discussion of these issues you may want to read Randall Bryant: Data-Intensive SuperComputing: The case for DISC]
At this point there is little hope of developing much faster sequential processors and it is slow to process large amounts of data on a single store [a one terabyte file on a disk will be read at ~40MB/s, thus it will take over 7 hours to read, but if divided over 1000 disks will be read in less than a minute].
To scale to solve larger problems we need to move from individual sequential systems to multi-core chips (MCC) and/or multiple chips arranged in shared memory systems (SMS). We have seen a programming model based on threads that is designed to run on such systems and we have observed about the difficulty to program concurrency at this level. At a higher level of aggregation, for bigger problems, we can composed MCC and/or SMS into clusters (physically close elements) or into more widely located distributed systems. As size and complexity of these systems grows, so does the complexity of programming them. More dramatically, the probability of faults becomes intolerably high [in a 1000 node system with disks disk/machine crashes occur every day].

We have seen an example of bypassing the concurrency/reliability problem in a limited context: Atomic Transactions. As you remember, we can access a distributed data base using transactions without having to worry about concurrency (locks applied automatically) or about crashes (system maintains logs and uses commit protocols). Here we will discuss another example of bypassing the concurrency+ reliability problem in another limited context for truly massive data and work load.

The Google Search Framework

Google is extremely protective of its technical know-how, but after many years a sufficient number of technical papers and presentations have become available to provide a general idea about their systems and algorithms.

We all know how Google search operates. It proceeds as follows

Of course I was forgetting the most important activity, inserting ads in the list so that there is a rich revenue stream for Google.

The interesting thing is that what started as a search specific approach has resulted in systems and a programming model that is proving extremely valuable in solving more general problems involving massive data sets and complex computations.

Google Strategic Choices

Google decided against using few expensive high performance servers and decided instead to go with many low-cost low performance servers. These servers are aggregated in clusters with more than a thousand servers each [A cluster supposedly consists of tens of racks. The nodes inside a rack (about 80 per rack) are connected by a switch with something like a 1Gb/s data rate between any two nodes. The racks are connected through a switch with a total data rate of the order of 2.5Gb.]. There are hundreds of such clusters. The same data will be kept at different clusters at different location on the globe. Google DNS servers will route user queries to replicas that are close to the querier to reduce latency and perform load balancing on the candidate replicas. A query will be processed totally within a single cluster. It is expected that servers and their disks will fail, thus software must be written to make the system fault-tolerant. Since Google is dealing with specialized problems, with data of greater size than other organizations, with users who expect low latencies, Google decided to write the large majority of the software they needed to service their computing infrastructure. This required access to the code of existing software so that it could be extended, hence use of open source software (which has also an economic advantage). And it required the use of high quality programmers. In fact Google has strenuously worked to attract and hire the best graduating computer students. Fault-tolerance was a priority, as it was a high degree of parallelism to complete work rapidly and with low latency. To have an idea of this parallelism, Google claims that on average a user query will involve in its completion 700 computers and the processing of hundreds of millions of bytes! Finally, Google chose not to go for universal solution, only for solutions for their specific problems. It just happens that those solutions solve more than their specific problems.

Principal Elements of Google's Systems

We will discuss four elements of the Google approach These components are being made available as Open Source by the Apache project Hadoop. A number of lectures on these topics are available: Google Code University: Distributed Systems. Testbeds are being developed by various organizations (for example the partnership HP, Intel, and Yahoo) for use by researchers. It is even available a version that runs on a single computer system. NSF has organized a workshop for educators to facilitate the adoption of this framework and software.

We will later look at the Google Page Ranking algorithm as an example of large computation that is run in the Google infrastructure using the Map Reduce programming model].

The Google Distributed File System (GFS)

There is a single master and many chunkservers. Since the master is a single point of failure we will need to examine later the ways in which it is made fault tolerant. It maintains permanent metadata about the files:

In addition it cashes the mapping from chunks to their current locations (this information is maintaine permanently by the chunkservers and uploaded to the master). All this information during operation is kept in main memory for speedy access. The master is the coordinator of the GFS. It supports the concept of "file" while the chunkservers only know about individual "chunks". It is in frequent communication with the chunkservers (there is a heartbeat protocol). To avoid becoming a communication bottleneck the master is only involved in control operations, not in the transfer of data. Data instead is transfered between clients and chunkservers. Among the chunservers storing a chunk one is selected by the master to be the primary chunkserver to be responsible for the chunk for a certain amount of time. It is given a lease for a certain amount of time (usually 60 seconds).

Let's examine how a client reads from a file at a specified cursor position.

  1. the client on the basis of the cursor position determines the ordinal position of the chunk in the file (divide cursor position by size of chunk) and sends to master request with file name and chunk index
  2. the master replies with the chunk handle and the locations of the replicas
  3. the client sends to one of the replicas the request with the chunk handle and the requested byte range.
Reads that span multiple chunks are split into separate reads. These reads will independently succeed or fail. The reading client will have to deal with such situations.

Let's now look at the more complex write operation:

  1. again the client determines the index of the chunk it needs to write to and sends to the master the file name and chunk id
  2. if currently there is no primary for that chunk the master selects one of the chunkservers storing that chunk to be the master and grants it a lease. Then sends to the client the identity of the primary and of the replicas.
  3. the client pushes the data to all the replicas. These replicas will keep the data in temporary storage and will acknowledge it to the client.
  4. Once all replicas have acknowledged the data the client sends the write request to the primary. which may have received and not yet carried out a number of mutation (write/append) requests. The primary assigns a sequential order to these mutations and carries them out on its local copies.
  5. The primary sends the write request and its sequential index to the other (secondary) replicas that will carry them out in the specified sequential order.
  6. The secondary replicas ackowledge to the primary when they have successfully completed the mutation.
  7. The primary replies to the client indicating the success or failure of the operation. [Note that the write may have succeeded at some replicas, failed at others.] It is the responsibility of the client to retry the operation in case of failure.
Writes that span multiple chunks are split into separate writes by the client.

Write data and control data flow

Let's now consider append. It proceeds as in the first 3 steps of the write operation. Then when it sends the request to the primary, the primary checks if the appended data fits in the current chunk. If not, it fills the current chunk with filler data and asks the replicas to do the same. Then it tells the client to try the operation again on the next chunk. Otherwise it appends the data to its replica and asks the secondaries to do the same at the same exact offset. Then when all secondaries acknowledge success it reports success to the client. Otherwise it reports failure and the client is supposed to retry the operation. In other words, the file may contain filler gaps, partial writes, even multiple copies of same data (caused by retry). It is a kind of at-least-once semantics.

It is important to note that the consistency of GFS does not coincide with intuitive notion of consistency [the result of a read reflects a sequential time ordered execution of the mutations that have preceded the read]. But it is adeguate to fullfil use goals for clients that have been written with logic to go around the pittfalls. Successfull mutations taking place sequentially will result in the same correct data being available at all replicas. But concurrent successful mutations caused by different clients may result in the same data in all replicas, but not the same as the data corresponding to a possible sequential execution of these mutations (i.e. some chunk may contain partial data of multiple concurrent mutations).

There are many more complexities in the Google File System. I will not deal with them. I will only mention that some locking facilities are provided and then go back to the problem of a single master for the system. To go around the possibility of failures the master uses a persistent operation log where it will record all changes in the metadata. This log, for reliability, is replicated. No change will be visible to a client unless it has first been recorded in the log and its replica(s). The master in case of crashes will be able to recover by replaying the log. And to speed up recovery the master state will be regularly checkpointed to allow for quicker recovery. It will also need to re-build its cache with handle to location map by querying the chunk servers.

The MapReduce Programming Model

We first look at the MapReduce programming model to understand how it is used. Then we look at how it is implemented in a way that automatically makes it concurrent and fault-tolerant.

Map and reduce are pure functions. Map given as argument a key, value pair produces a set of key, value pairs. The type of the map input key (the input key type) and of the map output keys (intermediate key type) may be the same or different. The same for the values (input value type and intermediate value type). The reduce function takes as argument a pair consisting of an intermediate key and a list of of intermediate values and produces a value (output value type - this type may itself be a key, value pair). All the keys and values are represented as strings.

To have an immediate feeling of what map and reduce may be here is the "Hello World" program for MapReduce: WordCount

	map(String name, String document):
	 // key: document name
	 // value: document contents
	 for each word w in document:
	   EmitIntermediate(w, "1");

	reduce(String word, Iterator partialCounts):
	 // key: a word
	 // values: a list of aggregated partial counts
	 //	here seen as an iterator on the list
	 int result = 0;
	 for each v in partialCounts:
	   result += ParseInt(v);
	 Emit(AsString(result));
As you see, the input pair has form [URL of a document, document content], the intermediate pairs produced by map have form [word, 1]. To reduce are given pairs of the form [word, (1, 1, ..., 1)] where the 1s are obtained from all the pairs generated by a map function that have that word as first component. It then produces the value [word, count], where count is the sum of the 1s. In other words, we give to map an URL and the corresponding document, and map from it produces pairs consisting of the words in the document and 1. Then in an intermediate stage all the pairs with the same word are put together to produce the pair [word, (1,1,..,1)] which is given to reduce which from it produces the pair [word, sum-of-the-ones].

A MapReduce program will be set up by chosing the number M of Map tasks (usually in the thousands) and the number R of reduce tasks (usually in the hundreds). [By the way, each map task is implemented as a real task, and the same for reduce tasks.]. An input reader possibly written by the user will partition the input data into M splits and feed them to the M map tasks. The map task will produce the intermediate pairs and place them into buffer storage. Then a partition program, usually selected by the user for example to hash the intermediate keys into R values, will place in local disk the resulting R partitions and will inform the R reduce tasks as to the location of the partitions (in reality it informs the master task that will in turn inform the reduce tasks). The reduce tasks will read the intermediate pairs that have been assigned to them, sort them using the compare function specified by the user and collect into a list the values associated with each intermediate key, then it will apply the reduce function to the resulting pairs. The result will usually be given to an output writer for writing to global stable storage as R files.
A MapReduce computation can be described by the following diagrams which emphasize different aspects of the computation.

This diagram expresses the computation flow we described above, from input data in global persistent storage, thru computation with an intermediate stage with persistent storage, to output data in global storage.

This diagram emphasizes the tasks being executed. The user program results into worker tasks (M map tasks and R reduce tasks) plus a master task coordinating and supervising the work of the worker tasks. The system schedules the worker tasks on the nodes of the cluster trying to optimize placement to minimize communication overhead. The master communicates with the workers using a heartbeat protocol. The reduce tasks cannot start the reduce activity until the map tasks are completed.
If a map or reduce tasks crashes, the master will start a new copy of that task. If a map task is particularly slow the master may start another map task to do the same computation. For a given input split the reduce tasks will accept the output of the first completed map task assigned to that split. The master may even decide, in the case that some split results in multiple crashes, to complete the computation without including that split in the computation.
The essential message of this diagram is that the MapReduce programmer need not worry about issues of concurrency and fault-tolerance since all scheduling and monitoring and reexecution of worker tasks is managed by the MapReduce master. The master is as usual a single point of failure so its execution must be supported by a replicated log that will allow us to recover the computation in case that the master crashes.

This third diagram makes a bit clearer how the reduce tasks, if they receive an intermediate key X then they receive all pairs that have intermediate key X.

A computation may involve, as in the case of WordCount, a single pass Map-Reduce. But many computations require multiple passes each executed as a Map-Reduce computations.
The PageRank computation used by the Google Search is an example of a multi pass map-reduce computation. We will examine it next

The Page Rank computation as an iterated MapReduce Computation

When answering a search query we would like to propose to the user documents ordered in a way that represents their likelihood of satisfying the user needs. That is the Page Rank of the document. The Google founders, Brin and Page, suggested as page rank the probability that a user moving among web pages would end at that page. First of all, given the graph of the web, they suggested a transition probability matrix T expressing the probability, if we are at node h to go to node k. Suppose the web graph is as follows

We can define T[h,k] as the ratio between the number of arcs going from h to k divided by the total number of arcs leaving h. So for example we will have T[1,3] = 0.4. The problem with this choice is node 4 which has no successors and that would give us a row of T filled with zeros. To go around this problem we assume that from 4 we may go with equal probability to any of the nodes, i.e with probability 1/6. The transition matrix for the given graph becomes
0100 00
000.20.4 0.20.2
0.5000 0.50
0.5000 00.5
0.1660.1660.166 0.1660.1660.166
0010 00
In reality people don't just follow links, at times they jump at random to other pages. To reflect this fact the transition probability is set to the formula:

 	T'[h, k] = (1-d)/N + d * T[h, k]
where d is usually chosen to be 0.85. For simplicity this new transition probability is again called T and in our example is
0.0250.8750.0250.025 0.0250.025
0.0250.0250.1950.365 0.1950.195
0.4500.0250.0250.025 0.4500.025
0.4500.0250.0250.025 0.0250.450
0.1660.1660.166 0.1660.1660.166
0.0250.0250.8750.025 0.0250.025

We can use this transition matrix to determine the probability, starting at node h (for simplicity let's assume that h is 0) to end at node k in one step, by multiplying the matrix T by the row vector [1,0,0,0,0,0] obtaining the row vector [0.025,0.875,0.025,0.025,0.025,0.025]. In turn the probability of being at page k after 2 steps will be obtained by multiplying T by this last row vector and so on. It can be proven that the iteration converges to a result that is independent of our initial choice for h (we had chosen h = 0).
All this is nice and good for small graphs, but what to do when the graph has hundreds of millions of vertices? Brin and Page suggested an alternative iterative procedure to compute the page rank. They used the following formula

	PR(T) = (1-d) + d * SUM(PR(Th)/C(Th))
where
	PR is the page rank of the page under consideration, T
	d is a tunable parameter corresponding to the idea that user do not just follow links,
		they also jump to a new page "randomly". Usually d is taken to be 0.85
	Th is a page with a link to T
	C(Th) is the number of out links from Th
then the iterative procedure starts with an initial assumption on the value of PR and determines a new distribution using the above formula, that is
	PRk+1(T) = (1-d) + d * SUM(PRk(Th)/C(Th))

Usually are made 20 or 30 iterations.

Now at last we get back to the PageRank computation using MapReduce. The computation is done in two phases.

PHASE 1:
	It consists of a single MapReduce computation

	The input consists of (URL, page content) pairs
	Map takes as input a (URL, page content) pair and produces the pair
		(URL, (PRinitial, list-of-all urls pointed to from URL))
	Reduce is the identity function

PHASE 2:
	It carries out MapReduce iterations until a convergence criterion has been reached.
	The kth iteration has the following form

	The input in the first iteration consists of pairs as were produced by Phase 1,
	in successive iterations it consists of the output of the previous iteration.

	Map takes as input a pair (URL, (PRk(URL), url-list of all the urls pointed to from URL))
		and produces the pair
			(URL, url-list)
		and, for each url u in url-list, the pair
			(u, PRk(URL)/Cardinality_of_url-list)
	Reduce for a given url u will sum up all values of the second kind of pair, multiply
		the sum by d and add 1-d to it obtaining a value which of course is
		PRk+1(u)
		It will then output the pair
			(u, (PRk+1(u), url-list)) // this url-list is obtained from the (URL, url-list) pairs
						// where URL is now u
As you see, it is not a trivial program, but it is feasible because the many map tasks can be done in parallel and the same is true for the reduce tasks.

The Bigtable Distributed Storage System

Bigtable is designed to deal with data sets larger than it is possible to handle with commercial data bases. On the other hand Bigtable does not support all the services available in commercial data bases, in particular full SQL. In fact though it supports operations on a table, and a Bigtable cluster may contain many big tables, it does not directly support operations across tables. It can be used both as input and as output of MapReduce computations. Its goals are to support Bigtable is a large software system, over 100.000 lines of C++ code, excluding the additional testing code.
The data model of Bigtable is (surprise) a table. The canonical Bigtable instance is Webtable. The rows are identified by keys (strings), in Webtable these are URLs. These URLs are stored in reverse order, so www.cis.temple.edu is stored as edu.temple.cis.www, and the rows are sorted by their row keys, thus related urls are close to each other.
The columns are identified by a family plus a column qualifier (strings), in Webtable for instance we have the "anchor" family with URL names as column qualifiers. So a column may be identified by "anchor:nytimes.com", specifying a link to the current page. A cell's content consists of multiple timestamped values, for instance the value of the page anchor:nytimes.com at successive times. These values are sorted in decreasing order based on their time stamps. A cell can be set up to retain up to a specified number of time stamped values, or only the ones within a certain time from now. The other values are garbage collected.
There can be very few hundreds of families each with thousands of qualifiers. In use are tables containing each over 200TB of data.
In Webtable most information significant for an URL is stored in the row named by that URL.

The row is the fundamental entity of Bigtable. Operations on a row are atomic, even better, groups of operations can be applied atomically to a row. Here are two examples of use of a Bigtable written in C++:

Writing to Bigtable:

	// Open the table
	Table *T = OpenOrDie("bigtable/web/webtable");

	// Write a new anchor and delete an old anchor
	RowMutation r1(T, "com.cnn.www");
	r1.Set("anchor:www.c-span.org", "CNN");
	r1.Delete("anchor:www.abc.com");
	Operation op;
	Apply(&op, &r1);

Reading from Bigtable:

	Scanner scanner(T);
	ScanStream *stream;
	stream = scanner.FetchColumnFamily("anchor");
	stream->setReturnAllVersios();
	scanner.Lookup("comm.cnn.www");
	for ( ; !stream->Done(); stream->Next()) {
		printf("%s %s %11d %s\n",
			scanner->RowName(),
			stream->ColumnName(),
			stream->MicroTimestamp(),
			stream->Value());
	}
A client may request Bigtable to execute scripts within the Bigtable system itself, i.e. close to the data. These scripts are written in a language called Sawzall which can read but not modify the Bigtable store.

A table is split into tablets where each tablet is a contiguous set of rows. Tablets are the unit of distribution, they play the same role in Bigtable as chunks in GFS. As in GFS we had a master and chunkservers, in Bigtable we have a master and tabletservers. Usually a tablet is ~200MB and a tabletserver holds a few hundred tablets. If a tabletserver crashes, its tablets are distributed equally to the remaining tabletservers. A table may consist of hundreds of thousands of tablets.
A basic problem is to map a row key of a table to the tablet that contains the specified row. The indexing structure required for this is not a centralized data structure, it is instead treated as a METADATA table decomposed itself into tablets that form a 3-level hierarchical structure.
Bigtable makes use of the Chubby distributed lock service: