Wednesday, March 4, 2015

P2P Systems

Why study P2P systems?
* First distributed systems that seriously focused on scalability wrt # nodes
* Widely used in cloud computing

Widely-deployed P2P systems (Industrial P2P)
1. Napster
2. Gnutella
3. Fasttrack
4. Bittorrent

Academia P2P systems
1. Chord
2. Pastry
3. Kelips
and lot more in academia

NAPSTER
* Napster clients store the actual files.
* There are several napster client, which are called as peers
* Napster servers at napster.com just contain information about the files and location of the peer (store a directory i.e. filenames with peer pointers) where the file actually resides.

When a client/peer comes up:
1. It connects to a napster server and uploads a list of music that it wants to share
2. Server maintains a list of <filename, ip-address, port#> tuples in the form of ternary tree (analogous to binary tree with 3 children). Server stores no files.

When a client searches for a file:
1. It sends server the keywords to search with
2. Server searches its directory list with the keywords
3. Server returns a list of hosts <ip-address, port#> tuples to the client
4. Client pings each host in the list to find transfer rates
5. Client fetches file from best host
Except for returning the directory list, the server is not involved in any other communication between peers.
All communication uses TCP which is reliable and ordered networking protocol.

Joining a P2P system
1. Send a http request to well-known URL for that P2P service
2. Request routed (after DNS lookup) to introducer, a well-known that keeps track of recently joined nodes in p2p system
3. Introducer initializes new peers' neighbor table

Problems:
1. Centralized server a source of congestion
2. Centralized server single point of failure
3. No security - plain text messages and passwords
4. Guilty of copyright infringement

GNUTELLA
* Eliminate servers
* Client machines search and retrieve amongst themselves
* Clients act as servers too, called servants
* All peers/servents are connected in a overlay graph (called overlay since each path is an implicit Internet path)
* Each peer stores their own files and also peer pointers

Search:
1. Gnutella routes different messages within the overlay graph
a. Query (search)
b. QueryHit (response to query)
c. Ping (to probe network for other peers)
d. Pong (reply to ping, contains address of another peer)
e. Push (for initiating file transfer)

Message Header Format:
1. Descriptor ID (16 bytes)
2. Payload descriptor (1 byte)
3. TTL (1 byte) : Decremented at each hop and message dropped when TTL=0. This is to avoid message going around indefinitely in the overlay graphs.
4. Hops (1 byte)
5. Payload length (4 bytes)

Avoiding excessive traffic:
1. To avoid duplicate transmissions, each peer maintains a list of recently received messages
2. Query forwarded to all neighbors except peer from which it received
3. Each query (identified by Descriptor ID), forwarded only once
4. QueryHit routed back only to peer from which Query received with same DescriptorID
5. For flooded messages, duplicates with same DescriptorID and PayloadDescriptor are dropped
6. QueryHit with DescriptorID for which Query not seen is dropped

After receiving QueryHit message:
* Requestor chooses best QueryHit Responder and initiates a HTTP GET request to respondor's ip-address:port. The GET request has range field for partial file transfers
* Responder then replies with file packets after this message. However, Responders can be behind a firewall which would block the direct GET request from the Requestor. Gnutella handles this by "Push" messages when direct GET HTTP request fails. The Push message is routed in the reverse QueryHit path and the Responder then sends the file packets back. Note that firewall only blocks incoming messages, but has no issues sending outgoing messages.

Problems:
1. Ping pong constituted 50% traffic. Solution was to multiplex (i.e. when multiple ping/pong messages are received, only send one out), cache and reduce frequency of ping/pong
2. Repeated searches with same keyword. Solution is to cache Query and QueryHit messages
3. Modem-connected hosts do not have enough bandwidth for passing Gnutella traffic. Solution is to use central server as proxy for such peers.
4. Large number freeloaders (a peer which downloads files, but never uploads files).

FASTTRACK
* Hybrid between Gnutella and Napster
* Takes advantage of "healthier" participants in the system
* Underlying technology in Kazaa, KazaaLite, Grokster
* Proprietary protocol, but some details available
* Like Gnutella, but with some peers designated as super nodes. Super nodes store a directory listing of nearby peers <file-name, peer-ponter> similar to Napster
* Supernode membership changes over time
* Any peer can become and stay super node, provided it has earned enough reputation/contribution.
* Reputation scheme : P2PEcon
* A peer searches by contacting a nearby super node, instead of flooding Query traffic throughout overlay graph.

BITTORRENT
* Similar to other P2P networks, but provides incentives for not being freeloaders.
* Tracker per file (receives heartbeats, joins and leaves from peers)
* When a peer connects, it gets tracker which lists all the peers that contain that file.
* It then gets peers. A peer can be a seed (full file) or leecher (few blocks)
* File split into blocks with block size between 32KB - 256 KB
* Download Local Rarest First block policy  : prefer early download of blocks that are least replicated among neighbors
* Incentives are provided by Tit-for-Tat bandwidth usage scheme : Provide blocks to neighbors that provided it the best download rates.
* Choking : Limits number of neighbors to which concurrent uploads are going on. Typically <=5 are best neighbors. Everyone else is choked. Periodically (~10 s) update the best neighbor list and periodically (~30s) unchoke a random neighbor so as to keep unchecked list fresh.

CHORD
* DHT (Distributed Hash Table)
- Hash table allows you to insert, lookup and delete objects with keys. HT stores objects in buckets. O(1) for operations
- A DHT allows to do the same in distributed setting, where objects=files; files have unique keys (like filename). However, instead of storing objects in buckets, objects are stored at nodes/machines in a cluster.
- Performance
1. Load balancing : Just like regular HT, all buckets/nodes should be equally balanced.
2. Fault-tolerance : When modes enter and leave the system, we don't want to lose objects
3. Efficiency of lookups and inserts
4. Locality

- Napster, Gnutella, Fasttrack and Bittorrent are kind of DHTs, but their efficiency of lookups and inserts is not that good.
- Memory, Lookup latency, #messages for lookup
a. Napster : O(1) @client/O(N)@server, O(1), O(1)
b. Gnutella: O(N), O(N), O(N)
c. Chord : O(logN), O(logN), O(logN)

* Intelligent choice of neighbors to reduce latency and message cost of routing (lookups/inserts)
* Uses Consistent hashing on node's or peer's address
- SHA-1 (ip-address, port) -> 160 bit string
- Truncated to m bits, which is called peer id. Not unique but conflicts are unlikely, especially with large m
- Can then map peers to one of the 2^m logical points on a circle

* Peer pointers : Each peer/node maintains the following information/pointers
- Immediate successor.
- Finger tables. The number of entries in finger table is "m" and ith entry (which is successor peer/node id on the ring) in finger table is governed by the following rule
id >= n + 2^i (mod 2^m)

* How are files stored?
- Filenames also mapped using same consistent hash function
-- SHA-1 (filename) -> 160 bit string
-- File is stored at first peer with id greater than its key (mod 2^m)
- For example cnn.com/index.html maps to key K42 is stored at first peer/node with ID > 42 on the ring
- P2P systems can be used to store any kind of objects : html files for cooperative web caching, file sharing applications including mp3 files

- With consistent hashing and K keys and N peers, each peer stores O(K/N) keys.

* How does search work?
- At node n, send query for key k to largest successor or finger table entry <= k. If none exists, send query to successor(n)

* Failures in Chord
- happen when peers fail. The finger table entry routing will not be able to reach a peer, when it's successor node fails. One solution is to maintain r multiple successor entries. In case of failure, use successor entries.
r = 2 log N
- happens when the node storing the file itself might. Solution is to replicate the file.
- Churn : Nodes join and leave/failure
-- When a new peer joins
a. Introducer directs N40 to N45 (and N32)
b. N32 updates successor to N40
c. N40 initializes successor to N45, and inits fingers from it
d. N40 periodically talks to neighbors to update finger table (Stabilization protocol runs at each node to get successor and finger table entries from its successors, eventually might even update its finger table entries with the newly joined node)
e. Some of the keys between N45 and N32 need to be copied over N40
-- #of messages for peer joins = O(logN)*O(logN)

* Virtual nodes
- Hash get non-uniform leading to bad load balancing
- Treat each node as multiple virtual nodes behaving independently
- Each joins the system
- Reduces variance of load imbalance

* Virtual ring and consistent hashing are used in Cassandra, Riak, Voldemort, DynamoDB, and other key-value stores
* For more information : www.pdos.mit.edu/chord/

PASTRY
* Assign ids to nodes, just like Chord (using virtual rings)
* Leaf set - each node knows its predecessor(s) and successor(s)
* Routing tables based on prefix matching (think of hypercube) and is O(log N) and hops are short in the underlying network
* The first m bits of the ID are matched and routing table entry is 1 bit matching, 2 bits matching, ..., m bits matching. This is unlike chord routing

Pastry Locality
* For each prefix, among all potential with a matching prefix, the neighbor with the shortest round-trip is selected
* Since shorter prefixes have many mode candidates (spread out throughput the internet), the neighbors for shorter prefixes are likely to be closer than the neighbors for longer prefixes
* Thus in prefix routing, early hops are short and later hops are longer
* Yet overall stretch, compared to direct Internet path, stays short

KELIPS
* constant lookup cost to DHT
* concept of affinity groups, k affinity groups where k = sqrt (N)
* Each node hashed to a group (hash mod k), where hash can be SHA-1
* Node's neighbors
- All other nodes in its own affinity group (on an average sqrt(N)
- One contact node per foreign affinity group (k-1 such foreign affinity groups)
- Total = 2 sqrt(N) - 1

Kelips Files and Metadata
* File can be stored at any nodes
* Decouple file replication/location (outside kelips) from file querying (inside kelips)
* Each filename hashed to a group
-- All nodes in the group replicate pointer information i.e. <filename, file location>
-- Affinity group doesn't store files

Kelips lookups
* Lookup
-- Find file affinity group by hashing the filename
-- From the node's(querying node) neighbors list go to contact for the file affinity group (just one hop)
-- Failing that try another of your neighbors to find a contact (maximum of two hops)
-- Note lookups are one hope or fewer
-- Memory cost is O(sqrt(N) : 1.93 MB for 100K nodes, 10M files
-- Fits in RAM of most workstations/laptops today (COTS machine)

Kelips Soft State
* Membership lists
-- Gossip-based membership, within each affinity group and also across affinity groups
-- O(log N) dissemination time
-- Unlike Chord and Pastry which have to look at other successors/predecessors in the even its immediate successor failure, Kelips lookup has lot more options in asking nodes within its own affinity group for the required affinity group OR it can ask node in a different affinity group for the required affinity group

* File metadata
-- needs to be periodically refreshed from source node in gossip style, otherwise the file metadata information times out and eventually gets removed from the system. The advantage is when a file is deleted, it doesn't need to be explicitly from all the nodes in the affinity group

* Slightly higher memory and background bandwidth cost, but lookup cost is O(1)

























Friday, February 13, 2015

Grid Applications and Infrastructure

Example: RAMS (Rapid Atmospheric Modeling System)

* Modeled mesoscale convective complex that dropped so much rain, in good agreement with recorded data
* used 5km spacing instead of usual 10 km
* ran on 256+ processors

HPC (High Performance Computing) : Computation-intensive computing
Typically HPC applications use lot of CPU resources, less data compared to data-intensive application, but lot more compute-intensive

Grid enables to run such programs without access to a supercomputer.

There might be several jobs in such applications, few of which can be run in parallel.
All jobs in an application are represented by DAG (Directed Acyclic Graph)
Example : o/p of Job0 serves as i/p to either Job1 or Job2; o/p of Job1 and Job2 serves as i/p to Job3
So, Job1 and Job2 can be executed parallely on different workstations.
These jobs are generally several GBs, but are mostly compute intensive

Each job may take several hours/days

Stages of a job

1. Init
2. Stage in
3. Execute
4. Stage out
5. Publish

Computation intensive, so massively parallel

The main question is allocation and Scheduling of the tasks among the grid resource or distributed workstations?

Scheduling problem

- DAG (Directed Acyclic Graph) of jobs to be scheduled across multiple sites

2-level scheduling infrastructure

1. Intra-site protocol

a. Internal allocation and scheduling (Which of the jobs run on which machines)
b. Monitoring (If a job fails, it needs to be started on another machine on the same site)
c. Distribution and publishing of files

Example: HTCondor protocol (High-Throughtput computing system from U. Wisconsin Madison)
Belongs to a class of Cycle-scavenging systems which
* run on a lot of workstations
* when workstation is free, ask site's central server (or Globus) for tasks
* If user hits a keystroke or mouse click, stop task by either killing the task or asking the server to reschedule the task
* can also run on dedicated machines

2. Inter-site protocol (Globus protocol)

Internal structure of different sites may be transparent (invisible) to Globus
It doesn't do the actual scheduling on the machines.
It basically does only external allocation and scheduling
Also responsible for stage in and stage out of files

* Globus Alliance involves universities, national US research labs and some companies
* Standardized several things, especially software tools
* Separately, but related: Open Grid Forum
* Globus Alliance has develiped Globus toolkit (contains standard tools to run inter-site protocol)

Globus Toolkit

* open source
* consists of several components
- GridFTP : Wide-area transfer of bulk data
- GRAM5 (Grid Resource Allocation Manager) : submit, locate, cancel and manage jobs
% not a scheduler
% Globus communicates with the schedulers in intra-site protocols like HTCondor or Portable Batch Systems (PBS)
- RLS (Replica Location Service): Naming service that translates from a file/dir name to a target location (or another file/dir name)
- Libraries like XIO to provide a standard API for all Grid IO functionalities
- Grid Security Infrastructure (GSI)

Security Issues

* Important in Grids because they are federated i.e. no single entity controls the entire infrastructure
* Single Sign-on : collective job set should require once-only user authentication
* Mapping to local security mechanisms : some sites use Kerberos, others using Unix
* Delegation: credentials to access resources inherited by subcomputations, eg job0 to job1
* Community authorization: eg third party authentication

* The above are important in clouds as well, but less so because clouds are typically run under a central control
* In clouds the focus is on failures, scale, on-demand access

Summary:

1. Grid computing focuses on computation-intensive computing (HPC)
2. Though often federated, architecture and key concepts have a lot in common with that of clouds. Grids need work in symphony and need lot of coordination while clouds are optimized to do independent work that is much less frequently coordinated. Clouds don't need as much tight control of what's going on as Grids.
3. Are Grids/HPC converging towards clouds? Eg: Compare OpenStack and Globus
These are different in standards being developed and conferences where papers get published
Look into Openstack and Globus on what are similar and what are different?

Group membership in datacenters and cloud environments

In datacenters, failures are the norm not the exception.
Say the rate of failure of one machine (OS/disk/motherboard/network, etc) is once every 10 years (120 months) on average
When there are 120 servers in datacenter, MTTF of a machine is 1 month
With 12,000 servers, MTTF is 7.2 hours and with 120,000 it becomes 0.72 hours.

Hence, need for failure detector. Failing to do so will lead to
1. Data loss
2. inconsistency in system
3. Loss of revenue
4. closure of data center

Target settings for failure detection:
1. Process group-based systems (clouds/datacenters, replicated servers, distributed databases)
2. Crash-stop/Fail-stop process failures. Once a process (member of a group) fails, it stops executing any instructions from that point to infinity. This is different from crash-recovery model when processes can join and recovered with different identifiers

Group Membership Service

Each application process (pi) maintains a membership list. This membership list is created, maintained and removed by membership protocol. Applications query the membership list using gossip, overlays, DHTs, etc
The main challenge of membership protocol is communication over unreliable network


Two sub-protocols

1. Failure detector : Mechanism that detects failures
2. Dissemination : Mechanism that notifies failures to other processes once detected; can also handle process joins and process leaves in a system

When one process crashes among a group of processes, atleast one non-faulty process detects the failure and disseminates to other member processes in the group so that other processes can update their membership lists

Types of membership lists:

1. Complete list all the time across all the processes in the group (strongly consistent) : Eg. virtual synchrony
2. Almost-complete list (weakly consistent) : Eg. Gossip-style, SWIM, etc
3. Partial-random list Eg. SCAMP, T-MAN, Cyclon, etc

Failure Detectors

Remember large group of processes, scalability is the goal, the same code runs at each process and communication is over unreliable network

1. pj crashes
Nothing we can do about it. Its a frequent occurence. Common case rather than exception and frequency goes up linearly with size of datacenter.

2. Failure detectors desirable properties
a. Completeness (correctness property): Each failure is detected eventually by a non-faulty process
b. Accuracy (correctness property): There is no mistaken detection

The above two are impossible together in a lossy network. Failure detector problem is similar to other well-known problem in distributed systems called "consensus" problem.
The consensus problem is where a group of processes that are trying to decide on the value of a single bit and there is a well-known proof which shows that it's impossible to do over lossy and delayed network.

In real life, completeness is guaranteed (100%) but accuracy is partial/probabilistic guarantee (nearly 100% but never 100%)
This is correct since all failures need to be detected. However, false positives due to accuracy trade-off, might cause an overhead of falsely removing a process and rejoining with different identifier. This is still better than the other way round (100% accuracy with nearly 100% completeness since we might lose failures)

c. Speed (performance property): Time to first detection of failure
d. Scale (performance property): Equal load on each member and no bottlenecks or SPOF. Low overall network message load

All the desirable properties are required in spite of arbitrary simultaneous process failures

Failure detection protocols

1. Centralized Heartbeating 

a. Heartbeat is a seq no. which is incremented (i++)
b. All the processes send heartbeats periodically to a central process
c. The central process maintains list of processes and heartbeats
d. If central process doesn't receive heartbeat from a process within a timeout, it marks the process as failed

Disadvantages:
- SPOF of central process (violates 100% complete desirable property)
- Central process is highly overloaded

2. Ring Heartbeating

All processes are in a ring-like structure and each process sends heartbeat to its left and right neighbors.
If a neighbor process doesn't receive heartbeat from the process within a specified timeout, it marks the process as failed

It's better than centralized heartbeating since it reduces load on a single process.

Disadvantages:
- It's unpredictable on simultaneous multiple failures where in 3 consecutive processes in a ring fail. It's possible that few failures go undetected (violates 100% complete property)
- This approach has the overhead of repairing the ring in the event of failures

3. All-to-all Heartbeating

Each process sends heartbeat to all the processes in the system
Advantage is it has equal load per member, though the overall load is high.
It is 100% complete since no failures go undetected.
However, if one of the processes is slow in receiving hearbeats, it's possible that lot of processes might be incorrectly marked as failed.
Hence need to make all-to-all hearbeating protocol more robust to increase the probabilistic guarantee of accuracy.

4. Gossip-stype failure detection (variant of all-to-all heartbeating but more robust)

In addition to above all-to-all heartbeating semantics, each node maintains a membership list/table consisting of the <pid, seq#, local time when seq was received from pid>.
Periodically, this local membership list/table is sent to "b" targets similar to the gossip protocol.
On receipt of membership list at the target, it updates its local membership list (updating the entries for pid, if they are more recent than what it has).
For example, in a 4 process system, let's assume
P1 membership list : <{1, 10120, 66}, {2, 10103, 62}, {3, 10098, 63}, {4, 10111, 65}>
P2 membership list : <{1, 10118, 64}, {2, 10110, 64}, {3, 10090, 58}, {4, 10111, 65}>

If P1 sends its membership list to P2, on receipt P2 updates its membership list as follows
<{1, 10120, 70}, {2, 10110, 64}, {3, 10098, 70}, {4, 10111, 65}>

Note: current time is 70 at P2 (asynchronous clocks)

If a particular node/process last update entry in the membership list exceeds a threshold wrt current time (t-fail secs), that process is marked failed by the protocol.
However, that failed entry is not immediately deleted from the row. Only after T-cleanup seconds (which is close to T-fail) it will delete the member from the list.
Need for two different timeout (T-fail and T-cleanup):
Due to the nature of gossip protocol, it's possible that one process deletes the entry from its list but it could be chosen as target for gossip by another process which still has the entry, thus causing the deleted entry to be incorrectly re-added.
The two processes can incorrectly ping-pong the failed process retaining the entry without actually detecting. Hence to avoid this ping-pong affect, instead of deleting the entry immediately the process entry is left for another T-cleanup (which is typically equal to T-fail) seconds.

What happens if gossip period T-gossip is decreased?
Detection time decreases, but bandwidth/messages increase.

What happens to P-mistake(false positive rate) as T-fail,T-cleanup is increased?
Detection time increases, but P-mistake decreases because we give slightly longer time for non-faulty nodes heartbeats to go across.

Tradeoff: False positive rate (P-mistake) vs. detection time (T-gossip, T-fail, T-cleanup) vs. bandwidth

Analysis:
- All-to-all heartbeating : Load = N/T (T is heartbeat sent evert T time units)
- Gossip-variant of all-to-all heartbeating : Load = N/tg where tg is gossip period; L = N Log N / T. Note that gossip load is more than all-to-all, but has better accuracy.
- Optimal L is independent of N, but both the above are dependent on N and hence are sub-optimal
* L= O(N/T)
* Try to achieve simultaneous detection at all processes
* fail to distinguish Failure Detection and Dissemination componentns

Key:
- separate the two components
- use a non heartbeat-based Failure detection component

5. SWIM (Scalable Weakly-consistent Infection-style Membership) Failure Detector Protocol : Probabilistic Failure Detector

a. Instead of heart-beating, we use pinging in each protocol period (T time units)
b. Stage 1: Process pi picks a random process pj and sends ping. Process pj responds back with an ACK
c. If either the original ping (pi -> pj) or the ACK (pj -> pi) is lost, pi tries to reach pj again, but by sending ping to "k" other random processes which ping process pj indirectly and send the ACK back to pi indirectly.

Time-bounded completeness
- key : select each membership element once as a ping target in a traversal
* round-robin pinging
* random permutation of list after each traversal

- Each failure is detected in worst case 2N-1 (local) protocol periods
- Preserves Failure Detection properties


Dissemination and Suspicion

Dissemination options

1. Multicast (Hardware/IP)

- unreliable
- not enabled in all routers/switches
- multiple simultaneous multicasts especially when multiple processes detect the same failure almost at the same time

2. Point-to-point (TCP/UDP)

- expensive especially when there are thousands of processes

3. Zero extra messages (SWIM failure detection): Piggyback on failure detector messages

- infection-style dissemination with ping, ack messages
* Epidemic/gossip style dissemination
* maintain a buffer of recently joined/evicted processes (piggyback from this buffer; prefer recent updates)
* Buffer elements are garbage collected after a while

Suspicion Mechanism

1. False detections due to perturbed processes, packet losses (from congestion)
2. Indirect pinging may not solve the problem (eg. correlated message losses near pinged host)
3. Key: suspect a process before declaring it as failed in the group. If while suspect messages are being disseminated and process become alive, mark the process alive.

- Distinguish multiple suspicions of a process

* per-process incarnation number
* inc# for pi can be represented only by pi (when it receives a <suspect, pi> message)
* somewhat similar to DSDV routing protocol

Rules of incarnation# and "alive, suspect, failed" state machine

- Higher inc# notification over-ride lower inc#s
- Within an inc# : (suspect inc#) > (alive, inc #)
- (failed, inc#) overrides everything else

Wrap Up

1. Failures are the norm, not the exception in datacenters
2. Every distributed system uses a failure detector
3. Many distributed systems use a membership service
4. Ring failure detection underlies IBM SP2 and many other similar clusters/machines
5. Gossip-style failure detection underlies Amazon EC2/S3 (rumored)

Thursday, February 12, 2015

Gossip Protocols

Gossip/Epidemic Protocols

The problem these protocols solve is the Multicast problem

Multicast

1. Get information out to other members in the group(only few in entire network), in contrast to broadcast which is to send information to everyone(all nodes) on the network.
2. Fault tolerance and Scalability
a. Nodes may crash
b. Packets may be dropped
c. Scale to 1000's of nodes, with overhead per node which doesn't grow very quickly as number of nodes grows into 1000s or even tens of thousands

Protocol that is executed at each of the nodes, including the sender is called Multicast protocol
Multicast protocol typically sits at the application layer (not the network layer).
Network layer has IP multicast and is implemented by routers and switches, but typically applications don't rely on it since not all switches/routers might implement it.

Multicast protocols:

1. Centralized : There is a sender and it sends to UDP/TCP packets to all the receiver nodes in the group.
Simplest implementation, but issues with fault tolerance (sender is single point of failure) and sender overhead affecting scalability and hence latency

2. Tree-based : IPmultipcast, SRM, RMTP, TRAM, TMTP.
Sender sends to a few nodes and those nodes in turn send to other nodes.
Sender is not single point of failure and doesn't have the overhead of sending the packet to all receivers helping in scalability and latencies, especially when the tree is well-balanced
Issues : Setup and maintain the tree, initially on bringup and on node failures (especially the non-leaf node failures which are responsible for transmitting to all nodes in its subtree)
a. Build a spanning tree among the processes of the multicast group
b. Use spanning tree to disseminate multicasts (might use IP multicasts for dissemination)
c. Use either ACKs or NAKs to repair multicasts not received
d. SRM (scalable Reliable Multicast)
   - Uses NAKs
   - But adds random relays, and uses exponential backoff to avoid NAK storms
e. RMTP (Reliable Multicast Transport Protocol)
   - Uses ACKs
   - But ACKs only sent to designated receivers, which then re-transmit missing multicasts
f. These protocols still cause an O(n) ACK/NAK overhead. So as the number of nodes increase, ACK/NAK overhead increases linearly and hence these protocols are not as scalable as they are thought to be. Hence, the need for gossip-based or epidemic-based protocols.

The Gossip Protocol

1. The sender periodically (every 5 secs or so) transmits gossip message(using UDP) to "b" (gossip fan-out, typically b=2) random targets.
2. Once the receiver receives a gossip message, it is said to be infected and now it periodically sends gossip messages to "b" targets. It's possible a node receives the same multicast message from different senders. In that sense, gossip protocol uses slightly higher number of messages, but the overhead is negligible when considering the advantages
3. Gossip protocol is not synchronized on all nodes and runs on its local clock

Types of Gossip protocols:

1. Push gossip : Once you have a multicast message, you start gossiping about it. If there are multiple messages, gossip a random subset of them, or recently-received ones, or higher priority ones
2. Pull gossip : Periodically poll a few randomly selected processes for new multicast messages that you haven't received and get those messages. In this type, all nodes (infected or uninfected) keep sending gossip messages
3. Hybrid variant or push-pull : In the initial query when you have the pull query going out, you also receive other recently-received push gossip messages making it "push-pull" variant.

Gossip Analysis:

Properties of simple push protocol
1. Lightweight in large groups
2. Spreads a multicast quickly (latency) : O(log n)
3. Highly fault-tolerant (reliable) : Gossip protocol is similar to how rumors spread in the society, diseases spread in human population and how virus/worm spread in computer systems. Once the entity is out, it's hard to contain or kill it.

In all forms of gossip(push or pull), it takes O(log n) rounds before about n/2 gets the gossip because that's the fastest you can spread a message - a spanning tree with fanout of constant degress has O(log n) total nodes
Thereafter, pull gossip is faster than push gossip because gossip in subsequent round proceeds super-exponentially (not just exponentially). Second half of pull gossip finishes in time O(log(log n))

Topology-aware Gossip for when crossing subnets or racks within datacenter. The load on the router will be high of O(n).
The fix is to gossip within subnet/rack with a higher probability and with lower probability for inter-subnet/inter-rack nodes. Once the gossip message crosses the subnet, within that subnet it gossips with higher probability.
Thus router load becomes O(1) and dissemination time of o(log n)

Gossip Implementations

1. Clearinghouse and Bayou projects by Xerox : email and database transaction systems [PODC 87]
2. refDBMS system [Usenix 94]
3. Bimodal Multicast
4. Sensor and wireless networks
5. AWS EC2 and S3 cloud (rumored, no design details published)
6. Cassandra key-value store (and others) use gossip for maintaining membership lists
7. Usenet NNTP (Network News Transport Protocol)

In summary, Gossip is a solution to multicast problem. There are centralized and tree-based solutions too, but gossip scales much better and is more fault-tolerant than other solutions. 

Sunday, February 8, 2015

Map Reduce

What is MapReduce?

Map and Reduce terms are borrowed from functional language like Lisp
Map is a metafunction which applies a function to a list and produces another list as output
Map processes each record sequentially and independently

Reduce is a metafunction which applies a function to a group of records in batches

Eg: Sum of squares
Map applies square function while reduce applies add function

Eg: Wordcount
Map processes individual records to generate intermediate key/value pairs
input: <filename, file text>
output: <key, value> pairs

i/p dataset can be split/shard to multiple Map tasks to parallely process a large number of individual records to generate intermediate key/value pairs

Reduce processes and merges all intermediate values associated per key
input: <key, value> pairs. Since multiple Map tasks process independently, there can multiple records with the same key
output: <key, value> pairs with unique key

Reduce can be parallelized with multipe reduce tasks with each key assigned to one reduce task and parallely processing and merging all intermediate values by partitioning keys

Popular Partitioning of keys for reduce tasks is Hash partitioning 
i.e. key is assigned to reduce task # = hash(key) % number of reduce servers

Map Reduce was invented by Google, but it was not open sourced.
Yahoo open-sourced implementation of Map Reduce and that became Apache Hadoop.
Hortonworks was spun-off of Yahoo and does lot of code development

MapReduce Examples

1. Distributed Grep

i/p : large set of files
o/p : lines that match pattern
map : emits a line if it matches the supplied pattern
reduce : copies the intermediate data to o/p

2. Reverse Web-link graph

i/p : Web graph in the form tuples (a,b) where page a -> page b
o/p : for each page, list of pages that link it i.e. all pages which point to page b
map : process web log and for each i/p <source, target>, it outputs <target, source>
reduce : emits <target, list(source)>

3. Count of URL access frequency

i/p : log of accessed URLs eg: proxy server
o/p : for each URL, % of total accesses for that URL
map : process web log and outputs <URL,1>
reduce : (multiple) emits <URL, URL_count>

The above is similar to WordCount, but we need % of total accesses for each URL. Hence we need to chain another MapReduce job
map : processes <URL, URL_count> (o/p of above MapReduce job) and outputs <1, (<URL, URL_count>)>
The reason for setting the key as 1 is so that there is only one Reduce task
reduce : (only one) sums up URL_count's to caluculate overall_count in one pass and emits multiple <URL, URL_count/overall_count> in second pass

4. Sort

In the implementation of MapReduce, by default
map task's o/p is sorted (eg: quicksort)
reduce tasks's i/p is sorted (eg: mergesort)
i/p : series of (key, value) pairs
o/p : sorted <value>s

map :  (key, value) -> (value, _) : Sorted by default engine
reduce : (key, value) -> (key, value) : Sorted by default engine
However, paritioning function should be range instead of hash function so that each contiguous range can be assigned

MapReduce Scheduling

1. Parallelize map - determine # map tasks and how dataset is split or sharded
2. Transfer data from map to reduce - paritioning function
3. Parallelize reduce - schedule reduce tasks
4. Implement storage for map input, map output, reduce input and reduce output

Ensure "barrier" between Map phase and Reduce phase i.e. ensure than no Reduce starts before all Maps are finished

YARN Scheduler:

Used in Hadoop 2.x+
Yet Another Resource Negotiator
Treats each server as a collection of containers (Container = some CPU + some memory)
Has 3 main components
1. Global Resource Manager (RM) for scheduling the containers to application tasks/jobs
2. Per-server Node Manager (NM) - daemon and server-specific functions
3. Per-application (job) Application Manager - container negotiation with RM and NMs; detection of task failure of that job

MapReduce Fault-tolerance

1. Server failure

a. NM heartbeats to RM. If server fails, RM notifies all affected AMs and AMs take action
b. NM keeps track of each task running at its server. If task fails while in-progress, mark the task as idle and restart it
c. AM heartbeats to RM. On failure, RM restarts AM which then syncs up with its running tasks

2. RM failure : Use old checkpoints and bring up secondary RM

3. Heartbeats also used to piggyback container requests, this avoiding extra messages


Slow Servers or stragglers

Due to bad disk, network bandwidth, CPU or memory
This is handled by keeping track of progress of each task belonging to the same job. If any task is detected to be a straggler task, it is replicated on another server with a speculation that the replica might get lucky and finish faster. The task is considered done when first replica completes, and  the other replica is killed after completion.

Locality

Single cloud has hierarchical topology;
GFS/HDFS stores 3 replicas of each of chunks (eg : 64 MB in size) - 2 on a rack, 1 on different rack
Mapreduce attempts to schedule a map task on the following order
a. a machine that contains a replica of corresponding input data
b. on the same rack as a machine containing the failing data
c. anywhere

Attributes of cloud computing -scale

1. Servers
2. SLAs, latency
3. Performance

Open Source
Contributed by Yahoo : Hadoop, ATS (Apache Tracking Server)
Used and more contributions to technologies brought from outside : Storm, HBase, Hive

Wednesday, February 4, 2015

Introduction

Many Cloud Providers:

1. AWS
a. EC2 : Elastic Compute Cloud
b. S3  : Simple Storage Service
c. EBS : Elastic Block Storage, accessed by EC2 instances

2. Microsft Azure
3. Google Compute Engine
4. Rightscale, Salesforce, EMC, Gigaspaces, 10gen, Datastax, Oracle, VMWare, Yahoo, Cloudera, etc

Two Categories:

1. Private cloud : accessible only to company employees
2. Public cloud : service to paying customer

Advantages:

* Cloud computing is useful to save money and time to bring up new compute and/or storage instances
- A new server can be up and running in 3 minutes, unlike 7.5 weeks to deploy a server internally.
  A 64-node Linux cluster can be online in 5 minutes, unlike 3 months if deplyed internally

- Reduction in IT operational costs by roughly 30%

- A private cloud of virtual servers inside datacenter has saved Sybase $2 million annually because the company can share computing power and storage resources across servers

- Startups can harness large computing resources without buying their own machines

What is a cloud?

Cloud = Lots of storage + compute cycles nearby
Compute is brought closer to data rather than data being moved closer to compute

1. A single-site cloud (aka "Datacenter") consists of
a. Compute nodes (grouped into racks)
b. Switches, connecting the racks - many top of rack switches are connected to one core switch in a 2-level network topology
c. A network topology, e.g. hierarchical
d. Storage (backend) nodes connected to the network
e. Front-end for submitting jobs and receiving client requests
f. Software services

2. Geographically distributed cloud consists of multipe such sites and each site perhaps with different structure and services

History:

1. First data centers (1940-1960) - ENIAC, ILIAC - They occupied entire hall
2. Time sharing companies and data processing industry - punch cards as i/p and o/p (Honeywell, IBM, Xerox)
3. Clusters/Grids (1980-2012) - Personal computers (Cray, Berkeley NOW Project, Supercomputers, Server Farms (Eg Oceano), Bittorrent, GriPhyN
4. Clouds and datacenters (2000 - present) - similar to dataprocessing era but different workloads

Technology Trends 

1. Moore's law : CPU compute capacity doubles every 18 months; earlier it was CPU frequency, now it is number of cores
2. Storage doubles every 12 months
3. Bandwidth doubles every 9 months

User Trends

Biologists are producing PB/year of data which needs to be stored and processed


Prophecies:

1. Computer facility operating like a utility (power or water company)
2. Plug your thin client into the computing utility and play your favorite Intensive Compute and Communicate Application
Unix is a precursor for this vision


What's new in today's clouds?

1. Massive scale : Large datacenters
2. On-demand access : Pay-as-you-go, no upfront commitment
3. Data-intensive nature : TBs, PBs, XBs - daily logs, forensics, web data, compressed data
4. New cloud programming paradigms : MapReduce/Hadoop, NoSQL/Cassandra/MongoDB, etc

I. MASSIVE SCALE

Power is either off-site (hydro-electric or coal) or onsite(solar panels)
WUE = Annual Water Usage / IT Equipment Energy
PUE = Total Facility Power / IT Equipment Power

II. ON-DEMAND ACCESS : *AAS CLASSIFICATION

1. HaaS : Hardware as a Service - access to barebones hardware machines; but security risk
2. IaaS : Infrastructure as a Service - No security holes as HaaS. Flexible computing and storage infrastructure. Virtualization is a way for achieving this. Eg: AWS
3. PaaS : Platform as a Service - Flexible computing and storage infrastructure, couple with a software platform (not in terms of VMs), easier but less flexible than IaaS. Eg: Google AppEngine/Compute Engine
4. SaaS : Software as a Service - access to software services, when you need them. Eg: Google docs, MS Office on demand

III. DATA-INTENSIVE COMPUTING

1. Computation-Intensive computing - MPI-based, high-performance computing, Grids; typically supercomputers
2. Data-Intensive - store data at datacenters, use compute nodes nearby since movement of enormous amount of data would unnecessarily consume a lot of bandwidth, compute nodes run computation services; CPU utilization no longer the most important resource metric, instead I/O (disk and/or network) is.

IV. NEW CLOUD PROGRAMMING PARADIGMS

Easy to write and run highly parallel programs in new cloud programming paradigms
1. Google : MapReduce and Sawzall
2. Amazon : Elastic MapReduce service
3. Yahoo : Hadoop + Pig, WebMap
4. Facebook : Hadoop + Hive

Economics of Clouds:

2 categories of clouds - public vs private clouds
Outsource or Own?
Do cost analysis and determine break even points for duration that the cloud/service will be operational