Pi

Distributed Decentralised App Substrate

Home

About

Code

Origins

Pi came about as a result of a (at the time) strong desire by a major global telco to explore new, game-changing ways of delivering combined software, infrastructure and network services in the cloud.

Clearly an ambitious and demanding challenge, it was one that would stand or fall on ability to

  1. rapidly develop and take to market entirely new infrastructure software services and capabilities,
  2. at scale,
  3. with resilience and service levels those of leading cloud providers
  4. whilst remaining competitive in terms of total cost of ownership

Seeking to combine these four corners of design space is of course challenging and must involve tradeoffs. The initial focus of our development effort - providing an EC2-compatible elastic compute platform for internal use - provided ample opportunity to define, explore and evolve these tradeoffs. It meant providing a way to deploy a distributed, resilient control and automation software layer across many thousands of machines.

Approach

Architecturally, this context quickly led us to look beyond 'mainstream' platform design approaches. There was a clear need for fine-grained control of huge amounts of distributed hardware, and co-ordination of its resources. As a minimum, we were going to have to deploy a software agent on each of the machines, so decisions became more focused on whether and to what extent to utilise this distributed substrate for co-ordination and resource management. The alternative would have been to build a set of centralised services using established service oriented patterns, storing and gathering data centrally and controlling the resources remotely.

We opted for the former approach, fully aware that it would likely be more challenging to implement, impose a generous helping of paradigm shifts upon its developers and maintainers, and be likely to get us to the edge of uncharted waters. What ultimately pushed us down this path was the realisation that a good balance between our four goals would depend on simplicity, automation and consistency. A significant maintenance and management burden across would risk killing the commercial proposition, now or in the future. Manual intervention was to be eliminated. We could not afford to solve problems of resilience and scale in multiple ways or rely on costly specialised infrastructure. We were keenly aware of the incompatibilities between fast time to market and a multitude of large teams or traditional techniques for deploying and managing infrastructure services.

And ultimately, our goal was to help our organisation change the game, not stick to the rules!

A number of succesfully deployed decentralised architectures, above all Skype, BitTorren and Amazon's Dynamo, provided extremely useful reference points in design space, as well as an illustration of what was possible not only in the data centre but also 'in the wild'. At the same time, Adobe were committing to a very ambitious effort to create a dynamic peer-to-peer network for media streaming across its near-ubiquitous Flash player install base.

Thus the design goals focused on creating a generic, common software substrate for the development and deployment highly scalable, highly robust software applications and services. It had to not only provide application platform primitives such as communications, data storage and lifecycle support, but also a consistent and easy-to-consume set of capabilities required to achieve scale and resilience - load balancing, failover, dynamic resource management and allocation.

This became Pi, the name alluding to the infinite number of digits in its decimal representation.

Pi is of course not alone in seeking to offer these capabilities. Rather than striving for 'magical' approach - 'just write your app, and we'll make it scale', the approach Pi takes is to explicitly define a 'menu' of tradeoffs needed to achieve scale and resilience, recommend patterns for implementing these tradeoffs, and provide APIs to facilitate and guide that implementation. When scale and resilience matter, abstractions should be a transparent means of convenience rather than opaquely hiding details that need to be understood as the system evolves.

The first set of services developed on Pi, named Pi Cloud (no relation to PiCloud!) provided de-facto standard infrastructure-as-a-service (IaaS) compute and storage services API-compatible with Amazon's EC2 and S3. Much of what follows took shape over the course of building and running this IaaS platform.

Anatomy of Pi

The foundations of Pi's distributed, decentralized architecture, borrow heavily from peer-to-peer and specifically distributed hashtable (DHT) paradigms, and several successful decentralised Internet-scale deployments, most notably pre-Microsoft Skype and BitTorrent.

Pi nodes - each with its own unique identifier - form a ring of peers (a structured p2p overlay), with each node tracking a set of peers. A core appeal of this organisation is that it allows one node to route a message to another via a bounded number of intermediate nodes; thus by mapping resources (such as data or services) to identifiers, nodes can locate and access those resources without having to have specific knowledge (addresses, routing etc) of how to access them.

The initial implementation of the structured p2p overlay / DHT was provided by by the FreePastry library, which takes care of organising nodes into a ring, providing message routing and delivery, and exposes events relating to messaging and node lifecycle to applications.

Above Pastry sits Pi Core, the p2p application framework that provides developer-friendly APIs for application creation, messaging, data access and management, and much more. It also abstracts away the specific DHT implementation provided by Pastry.

Pi Cloud services are implemented as a series of Pi applications, starting with a shared module that defines the shared data entities, followed by applications for network, volume, image, storage and management, and finally the API application which exposes the APIs. These applications are then packaged together to form a deployable artifact – in the case of Pi Cloud, a RPM package - to be deployed on each server node, and run in a single JVM instance.

Pi is Java-based, with Pi Cloud featuring a small number of platform-specific aspects, largely around networking for machine instances. These currently work with CentOS / RedHat Linux.

The initial release of Pi Cloud was deployed on Rocks Clusters, and uses Parascale and Gluster as the underlying distributed storage substrate.

Developing Pi Applications

Developers coming to Pi with a background in prevalent server-side request / transaction processing programming models typically find they have to 'get their head around' several core concepts that may well be familiar in a different context. These commonly include:

Patterns of Pi

1. Ring Creation and Initialisation

Ring Initiator (How does a node start a new Pi ring?)

In Pi, system nodes are peers that form a DHT in a ring structure, enabling routing to and location of resources. To join a ring, a Pi node goes through a bootstrapping step that starts with contacting a node that is already in that ring. This allows it to discover its place in the ring (based on its randomly generated 160-bit node ID), and populate its initial list of known peers.

Pi currently supports peer discovery based on a list of known hostnames or IP addresses, with intention to also support broadcast and other mechanisms.

When started, all nodes attempt to join an existing Pi ring, by using one or more available peer discovery mechanisms (see below). If this fails or times out, a node will by default not start a new ring.

In order to create a new ring, a configuration flag that normally prevents nodes from starting new rings needs to be disabled. This allows a node to start a new ring. Once a node has started a new ring, other nodes can bootstrap off of it, inheriting its properties where applicable (for instance, region and availability zone in the case of Pi Cloud).

Seeder (How does a DHT get initialised with required data?)

Some applications require presence of one or more data items in the DHT in order to function. Although it is possible to have applications create required data lazily if attempts to read them fail, this makes it difficult to distinguish between data that becomes inaccessible due to a fault, and runs the risk of creating multiple copies of the same entity.

Some examples of data items that may need seeding include entities used to determine whether or not an application should be active on a certain node, records used for resource allocation, and task queue records.

A seeder is simply a service that allows such records to be created in the DHT when a new ring has been started. It usually performs a series of DHT store operations, using configuration data specified via the service interface.

In addition, a seeder may be used to initialise a new subset of nodes within an existing ring (for instance to create configuration for a specific environment or availability zone, for use by nodes within that environment), or to alter the settings of a previously seeded record.

2. Applications

Pi Application (How do I create a Pi application?)

Developers simply extend the appropriate Pi application base class. There is a 'vanilla' Pi application base class, as well as its extensions if additional functionality is required - for instance:

Then define application activation strategy (see below) and implement handlers for all or some of the following:

The current implementation of Pi recommends making each application a Spring bean, and having Spring inject required dependencies.

Application Activator (How can I have an application be active on a subset of nodes in the p2p network?)

Some applications will want to run only on a subset of nodes. Examples might include applications that expose a service on one of N IP addresses, applications that perform some intensive processing, or those that aggregate information.

An Application Activator provides a strategy that is used to determine whether or not an application should be active on a particular node. The meaning of 'active' and 'passive' are entirely application specific - indeed it is not uncommon in Pi for 'passive' applications to still perform functions like monitoring and routing messages that pass through them.

Pi currently comes with three activators:

It is additionally possible to specify basic exclusion rules between applications, such that an application is prevented from becoming active if some other specific application is already active. This may be desirable if, for instance, two applications may conflict in some way if run on the same node, or place high load on compute, network and / or storage resources on that node.

Exposed Service (How can I expose a service to external consumers?)

Pi can automatically manage the exposure of an application via a set of IP addresses, across a range of nodes. This is achieved by extending the base class that provides managed addressing capabilities for a Pi application, selecting the shared DHT record application activator, and seeding the relevant application record with appropriate addresses in the DHT.

3. Data

Distributed Hashtable (DHT) (How do I store and retrieve persistent data in Pi?)

Pi provides an implementation of a DHT, acting as a distributed key-value store. Currently this is optimised for storage and retrieval of relatively 'small' data records. Intention was to also provide support for append operations on entries, with a view to supporting easy and efficient gathering of large amounts of data for later indexing or processing.

Pi DHT provides the following operations:

There is no delete operation due to the replicated and dynamic nature of a DHT. Applications can however add a 'tombstone' flag to mark records as deleted and treat them as such.

Pi's DHT API comes in asynchronous and synchronous (blocking) flavours. Asynchronous should be used wherever possible. Blocking DHT operations are provided for convenience, but should only be used where there is certainty that they will never be invoked from the event worker thread. An example would be aDHT operation invoked from a web server thread whilst processing a request.

In 'classic' DHT fashion, DHT records are stored on the node with node ID that is nearest to the ID of the key. Records being written are typically replicated to one or more nodes adjacent (in node ID space) to the nearest-ID node. How many replicas are written in total, and how many replaces have to be written in order for a write to be deemed successful, are configuration parameters of Pi that express a tradeoff between write availability and consistency.

Data records in the DHT are stored with a set of HTTP-like headers that represent metadata describing the record's ID, its URI (and thus, via the scheme, the type of the stored resource), content type, version and so forth.

In addition to using remote DHT APIs, each node can also 'scan' DHT data stored locally on that node. This is very useful when data with certain properties - perhaps a record that has expired in some way - needs to be found and acted upon in some way. There is also support for indexing local resources of a given type, then publishing the result to some set of supernodes to provide runtime querying, aggregation, search or reporting facilities. These local DHT access APIs are not used for altering or updating data as this side-steps the ID-based scheme that drives data record location.

Entity Identifiers and Modelling (How do I model, identify and locate domain-specific entities stored in Pi?)

Pi mandates the use of a common data model for application entities - naturally there is a need for applications on different nodes to be able to communicate, share data, and understand that data. Furthermore, in general a large P2P network will consist of different versions of applications, hence applications have to be robust to this, and in some cases code for multiple versions of a given entity, as already indicated under Data Storage in the section on Developing Pi Applications.

Pi entities are consequently modelled as Java objects that implement a lightweight interface (PiEntity), serialised and stored as JSON. This provides a structured data format, without imposing a rigid schema, allowing for a less rigid data processing model whilst still allowing explicit structure that each application expects to be supported. Fine-grained control over binding of JSON to Java is done through standard Java annotations.

Pi treats all instances of entities as resources, each with its own URI. The scheme of the URI represents the type of the resource.

For Pi Cloud, there is an explicit mapping to Pi / DHT ID for a resource from the combination of the URI and the scope of that resource (availability zone, region or global). An ID generator function is thus provided to return the correct ID given this scope and identifier information.

Searching and Indexing (How do I aggregate data, or find data matching certain criteria?)

Pi's decetralised nature and support for basic data access operations only lead to several techniques for indexing, search, discovery and reporting, with required effort levels growing with dataset size.

For small, limited-size data sets, a single DHT record can be used, and can simply be searched or processed to generate reports.

Taking this a step further, a finite, known set of DHT records can be used to store data, and these can be indexed periodically to create / update a set of index records that can be used for searches. There will be a latency associated with indexed data.

For queries that are known at development time, a search through records on the local node can be used, with the discovery of a matching record triggering some action.

Where the query is not known until runtime, local DHT store can be examined, and data on any records of interest published to a set of supernodes for aggregation and querying.

Finally, where latency is not an issue, DHT records can be batch-processed for reporting, indexing and other purposes.

These are all techniques that have been used with existing Pi applications. This list is certainly not exhaustive however.

Caching (How do I ensure DHT lookup results are reused?)

DHT reads and writes can be expensive, hence caching becomes important early on. Pi provides blocking and non-blocking DHT cache access APIs, with both read-through and write-through capabilities. These should be used with the same constraints already described around the use of blocking and non-blocking DHT access APIs, with non-blocking preferred wherever possible.

Pi's current cache implementation is backed by Ehcache.

4. Messaging

Message Passing and Routing (How do I send a message from one node to another?)

Pi makes it easy to send messages from one node to another. All that is required is a destination ID, which can be computed from a resource as described in the Data section and provides a convenient location mechanism without requiring knowledge of the resource's address. The message will be delivered to the node whose node ID is the nearest to this destination ID.

Pi messages typically consist of a destination ID, method (a CRUD verb) and a Pi entity being sent. Optionally, a message can be sent to an application that is different from the sending application. Messages additionally contain elements that are generally not visible to the application, above all a unique message identifier, a correlation identifier for matching responses with requests (where applicable), and a transaction ID used to group related messages together.

The Pi implementation provides convenience Messages are sent through a message context. A new message context can be created trivially from a message context factory (an interface implemented by all Pi applications), or a message context of a received message can be used to send a new message. The latter approach has the advantage of keeping a consistent transaction ID between messages - thus related log messages across a series of nodes can easily be identified.

This structure can be altered or replaced if required.

Note that Pi messaging is inherently unreliable. Although nodes do have a level of routing logic that improves robustness under node churn, sent messages are not guaranteed to arrive at their destination. Other mechanisms need to be used to ensure reliability.

Publish-subscribe (How can a set of nodes advertise a service or interest?)

Pi supports the standard publish-subscribe paradigm. By leveraging properties of the p2p overlay, it is entirely decentralised and able to support them in a massively scalable manner. Topics are mapped to Pi IDs. Subscribers to a topic join that topic by adding themselves to a tree of subscribers, rooted at the node closest to the topic ID. Nodes wishing to publish to a topic do so by sending a message toward the topic ID; this is intercepted and handled en route by the first node to subscribed to that topic. If there are no subscribers, the message terminates at the topic ID node, where the 'no subscribers' condition can be handled.

All of this is abstacted from Pi applications, which only have to worry about declaring that they wish to subscribe to or unsubscribe from a topic, and optionally handling the 'no subscribers' case.

Anycast (How do I send a message to one of a series of interested recipients?)

Anycast involves sending a message to a topic, such that the message is processed sequentially, one node at a time. A node that receives an anycast message may process it without forwarding it onto other topic subscribers, alter and / or process it partially before forwarding, or forward it on without processing.

Anycast messages can be sent 'directly' or via a randomly chosen intermediate node. The latter is useful when wanting to ensure that anycast messages sent from a particular node do not traverse the topic tree the same way, providing better distribution of messages across subscribers.

Among other uses, anycast provides an effective, decentralised mechanism for finding nodes that can handle a certain request or supply a certain resource.

Broadcast (How do I send a message to a group of interested recipients?)

An application can broadcast a message to a topic. Subscribers to a given topic will distribute the message to all known peers that have also subscribed to the topic. The message is in effect distributed throughout the topic node tree in parallel, making broadcast efficient.

5. Resource Management

Consumed Resource Registry (How do I track the usage of a resource that may have multiple consumers and / or require co-ordination across nodes?)

Sometimes distributed applications, or different applications running on the same node, need to manage the lifeycle of an resource, creating it on demand and destroying it when no longer needed. This becomes particularly important when that resource is scarce, consumes other resources, or when duplicate instances of the same resource are problematic. Examples from Pi Cloud include network security group NATing (used by apps / VMs on multiple nodes) and network setup for VMs (used by multiple apps).

Pi provides a simple 'consumed resource registry' component, which allow applications to track how many consumers a particular resource has, and thus determine when a resource needs to be created (first consumer registered) or removed (last consumer deregistered).

There is also an extended resource registry implementation, a 'cached consumed DHT resource registry'. This is appropriate when a resource is used across multiple nodes (as in the security group scenario above). In this case, an entry in the DHT is used to track resource state. Apps update and watch the DHT record to reflect resource lifecycle and if required track its consumers. Each node tracks changes to that resource state via a resource refresh strategy, for instance enabling the triggering of a periodic refresh of the resource through reloading the DHT record, and allows specific actions to be performed upon refresh. For fault tolerance, the semantics of the implementation also allow each consumer of a resource to itself be probed (ping / heartbeats) to ensure that particular consumer still exists. Cached resource records can be retrieved on demand, as can a list of consumers for a specific resource, and a list of resources of a certain type. This is essential when applications need to track state in memory, avoiding the need for frequent DHT operations. When a resource needs to be allocated to a unique owner node, this is typically based on node ID proximity to the ID of the resource name hash.

Subscriber Resource Allocation (How do I allocate resources available across a set of nodes?)

A highly scalable, decentralised scheme for resource allocation involves nodes that possess the given resource advertising its availability by subscribing to a topic. A client wishing to utilise that resource sends an anycast message to the topic. This is handled by one or more recipient nodes, which allocate the resource as appropriate.

6. Fault Tolerance

Task Processing Queues (How do I capture a task or operation so that it can be retried if unsuccessful?)

Pi provides a means of storing details of pending operations (tasks) which can be carried out in an idempotent fashion, and hence retired if a previous attempt to execute had an unsuccessful or unknown outcome.

A Pi application can define one or more task processing queues, usually through seeding underlying DHT records. When a task needs to be performed, it is added to the appropriate queue. Instances of that same application monitor the queue for any tasks that have not been picked up or have timed out (become stale) so can be retried. A maximum number of retries can be set before tasks are reported as failed.

If the initial (or subsequent) execution of a task succeeds, the corresponding queue item is removed. If, however, it fails, the polling mechanism will ensure it gets retried.

Watchers / Supervisors (How do I detect and resolve anomalies resulting from failures or malfunctions?)

Pi applications can implement watcher components to monitor and rectify issues - anything from misconfiguration of the local node to data inconsistencies to consumer state monitoring. Pi provides a watcher service, which allows application-specific watchers to be registered for periodic execution. The granularity and frequency of each watcher can be whatever is appropriate for each application.

Eager Node Failure Recovery (How do I detect and act on a node failure in a timely manner?)

Pi delivers node arrival and departure events to applications. Among other things, this can be used to detect node failure, reported as a departure event, and trigger appropriate action. This can mean eagerly activating an existing recovery mechanism, for instance forcing application activation checks, picking up a task from a task queue, or sending an anycast message to a set of subscribers capable of taking over from the departed node.

TODO: tooling, experience - 60,000 LoC, 110,000 tests - worked! blown away by demo of pulling cable out - development model - DHT structure - what constitutes an outage - various levels - network, infra etc - all about health checks - skipnet - tooling - logging / trobleshooting - subset would be nice - groups / spread - cost of ownership / support - releases cf FB - nauman - cricket - cap theorem - orphaned nodes - shutting down, state machine approach - user DB (divided opinions - not unlike skype before MS - processing - dd, instance creation - nice only helped so much, needed to queue / co-ordinate in app - networking - back and forward network we never had cascading outage (apart from that time we blew up JVMs across the cluster - but that was us :)) functional, state machines p2p absolutely does not absolve one of responsibility to consider system properties Refs - pastry, kademlia - commutative data types - promise of gossip - pss / t-man - network - layer 2 - skipnet - microbursts - CRDTs - dynamo - skype - bit torrent?