Data Plugs

In the Dataswift PDA ecosystem, a Data Plug is a minimal API-to-API web service with the primary purpose of fetching data from 3rd party API and pushing it to individual's PDA. Due to the security considerations and limited scope of required functionality, each Data Plug is granted a write-only access to any particular PDA. Authorisation to post data is obtained via an API call to the Data Exchange service and requires prior registration of the plugin. Furthermore, plugins are able to post data to a limited namespace corresponding to their registration name.

In order to create a fully functioning Data Plug, the following general requirements need to be met:

  • Obtain user's authorisation to access their data via the 3rd party API

  • Obtain authorisation to write data to the individual's PDA

  • Implement general logic to move data from the API to the PDA

  • Implement logic to maintain track of synchronised data for each endpoint

Solutions for the above requirements can be implemented with any backend technology stack as long as the API requirements can be met.

Open Source Data Plugs

Aiming to streamline the process of Data Plug development, we've created an open source repository that generalises a lot of the solutions to common tasks. While the code is written in Scala and uses the Play Framework, a functioning Data Plug can still be built with a limit knowledge of either of these technologies.

Currently, the codebase is centered around a "core" implementation that automates most of the processes related to authorisation and data synchronisation. Each individual plugin then extends the core and adds its own API-specific logic to deliver a fully functioning service.

The focus of this tutorial is to give a brief introduction into how open source codebase is set up, give examples of how certain logical aspects are implemented and provide enough information to bootstrap your own Data Plug as quickly as possible.

Authenticating Users

Data Plugs can function correctly only if it's been authorised to access both the 3rd party and individual's PDA APIs. Furthermore, the service has to maintain valid authentication tokens for both APIs to remain in the "connected" state. As soon as access is lost to either API "disconnected" status should be reported.

Authenticating with a PDA

Core plugin implementation provides both an interface for user authentication and background services for authentication and authorisation. Plugin authors can easily customise UI texts, titles, and messaging by modifying conf/messages.en file in their particular plugin project. For example, messages.en file for Facebook plugin. Alternatively, a completely custom user interface can be set up by implementing your own Data PlugViewSet controller and binding it in the project module.

A more streamlined user authentication experience is also supported. User interface can be skipped altogether if redirecting application adds token=${APP_TOKEN} and redirect=${REDIRECT_URL} query parameters. Here, APP_TOKEN refers to a JWT app token signed by the owner's HAT for that particular plugin and REDIRECT_URL specifies the callback URL to go to after a successful setup.

Core plugin background services also fully automate the caching and refreshing of HAT access tokens to always maintain authenticated state.

Authenticating with 3rd party APIs

Authentication with most of the popular APIs work via OAuth standard. The project uses Silhouette, an authentication library for the Play Framework, simplify the login setup process as much as possible. The library comes with a few default integrations (Facebook, Instagram, etc.), but will require the plugin author to implement custom authentication provider for the less popular services. If that's the case, it will be necessary to create a tweaked authProvider version and supply it to Silhouette instead. The specifics around the required changes will highly depend on the API and cannot be covered in general. We would advise to read the Silhouette documentation, particularly about authentication providers. Also, there some examples in the current project repository that can be used for inspiration:

  • Fitbit Provider — repo

  • Monzo Provider — repo

  • Spotify Provider — repo

Also, Silhouette-specific configuration parameters have to be included with the project. These can be found in the conf/silhouette.conf file and a particular list of parameters will depend on (1) authentication framework used (examples here), and (2) any custom behaviours added to the default authentication provider. For example, in the Fitbit plugin we've added authorizationParams, refreshHeaders and customProperties parameters to ensure correct functioning of the auth provider.

fitbit {
authorizationURL = "https://www.fitbit.com/oauth2/authorize"
accessTokenURL = "https://api.fitbit.com/oauth2/token"
redirectURL = "http://dataplug.hat.org:9000/authenticate/fitbit"
redirectURL = ${?FITBIT_CALLBACK_URL}
refreshURL = "https://api.fitbit.com/oauth2/token"
clientID = ""
clientID = ${?FITBIT_CLIENT_ID}
clientSecret = ""
clientSecret = ${?FITBIT_CLIENT_SECRET}
scope = "activity heartrate location profile sleep weight"
authorizationParams {
response_type = "code"
}
refreshHeaders {
Content-Type = "application/x-www-form-urlencoded"
}
customProperties {
authorization_header_prefix = "Basic"
parameters_location = "query"
}
}

Adding Endpoint Interfaces

Core Data Plugs implement general logic of fetching data from an API endpoint, posting it to the PDA, tracking the status of the latest operation and setting up parameters for the next scheduled task. The synchronisation is performed by polling the API at a configured time intervals on endpoint-by-endpoint basis. Each endpoint interface is represented by a class that extends the DataPlugEndpointInterface trait.

For example, FacebookFeedInterface class:

class FacebookFeedInterface @Inject() (
val wsClient: WSClient,
val userService: UserService,
val authInfoRepository: AuthInfoRepository,
val tokenHelper: OAuth2TokenHelper,
val cacheApi: CacheApi,
val mailer: Mailer,
val scheduler: Scheduler,
val provider: FacebookProvider) extends DataPlugEndpointInterface with RequestAuthenticatorOAuth2 {}

Note that the class also extends RequestAuthenticatorOAuth2 trait which contains request authentication helper methods for OAuth2 standard.

DataPlugEndpointInterface trait mandates that certain configuration parameters and action methods are present in the class. Let's look at the configuration setup first.

val namespace: String = "facebook"
val endpoint: String = "feed"
val refreshInterval = 1.hour
val defaultApiEndpoint = FacebookFeedInterface.defaultApiEndpoint

The namespace and endpoint parameters indicate where the information obtained from the API, should be stored on the PDA. The data synced by the example setup above will be available at /api/v2.6/data/facebook/feed path on the owner's PDA. Note that, due to PDA's namespacing rules, the namespace parameter must MATCH Data Plug's registration name.

Refresh interval indicates how often API polling should be performed for that particular endpoint. Keep in mind, that if there are two separate endpoints in the Data Plug, one polling every 60 minutes and the other polling every 30 minutes, the plug will make 3 requests/hour in total. Keep that in mind, when configuring the service to respect the rate limits of the API.

Finally, the default API endpoint for fetching data needs to be provided — it is usually placed in the accompanying object and contains all the information needed for the initial API fetch.

object FacebookFeedInterface {
val defaultApiEndpoint = ApiEndpointCall(
"https://graph.facebook.com/v2.10",
"/me/feed",
ApiEndpointMethod.Get("Get"),
Map(),
Map("limit" -> "500", "format" -> "json", "fields" -> ("id,admin_creator,application,call_to_action,caption,created_time,description," +
"feed_targeting,from,icon,is_hidden,is_published,link,message,message_tags,name,object_id,picture,place," +
"privacy,properties,shares,source,status_type,story,targeting,to,type,updated_time,with_tags,full_picture")),
Map(),
Some(Map()))
}

Default API endpoint is constructed as an instance of the ApiEndpointCall case class. In the example above, the base url of the service, path to a particular endpoint, request method (GET), and a list of query parameters are all specified in the object. The supplied object is only used in the initial data fetch — every successful API call will create an updated version of the ApiEndpointCall, save it in the database and use it to infer the latest state of synchronisation. In the case of Facebook feed, since and after query parameters are added dynamically to control the time interval of posts requested in the subsequent synchronisation rounds.

In order to be able to correctly evolve the state of the next ApiEndpointCall object, each interface needs to implement two custom methods: buildContinuation and buildNextSync. Both methods receive the response body content of the API call and an ApiEndpointCall object used to make that call. The buildContinuation method is always called first. Within this method the plugin author needs to evaluate the API response and determine whether the synchronisation round can be finished or further requests are needed to paginate over more data. In the former case None object should be returned and in the latter case a ApiEndpointCall object modified in a way that takes into account pagination parameters. Pagination requests will loop until eventually None object is returned by the continuation method. Next, buildNextSync method will be invoked. Here, various call parameters can be cleaned up and adjusted for the next synchronisation cycle. The method must return a valid ApiEndpointCall object to persist in the database and once it's finished the round can considered as finished.

The implementation of these methods for FacebookFeedInterface looks like this:

def buildContinuation(content: JsValue, params: ApiEndpointCall): Option[ApiEndpointCall] = {
val maybeNextPage = (content \ "paging" \ "next").asOpt[String]
val maybeSinceParam = params.pathParameters.get("since")
maybeNextPage.map { nextPage =>
val nextPageUri = Uri(nextPage)
val updatedQueryParams = params.queryParameters ++ nextPageUri.query().toMap
if (maybeSinceParam.isDefined) {
params.copy(queryParameters = updatedQueryParams)
}
else {
(content \ "paging" \ "previous").asOpt[String].flatMap { previousPage =>
val previousPageUri = Uri(previousPage)
previousPageUri.query().get("since").map { sinceParam =>
val updatedPathParams = params.pathParameters + ("since" -> sinceParam)
params.copy(pathParameters = updatedPathParams, queryParameters = updatedQueryParams)
}
}.getOrElse {
params.copy(queryParameters = updatedQueryParams)
}
}
}
}
def buildNextSync(content: JsValue, params: ApiEndpointCall): ApiEndpointCall = {
val maybeSinceParam = params.pathParameters.get("since")
val updatedQueryParams = params.queryParameters — "__paging_token""until""access_token"
maybeSinceParam.map { sinceParam =>
params.copy(pathParameters = params.pathParameters — "since", queryParameters = updatedQueryParams + ("since" -> sinceParam))
}.getOrElse {
val maybePreviousPage = (content \ "paging" \ "previous").asOpt[String]
maybePreviousPage.flatMap { previousPage =>
Uri(previousPage).query().get("since").map { newSinceParam =>
params.copy(queryParameters = params.queryParameters + ("since" -> newSinceParam))
}
}.getOrElse {
logger.warn("Could not extract previous page 'since' parameter so the new value is not set. Was the feed list empty?")
params
}
}
}

One common pattern emerges across multiple APIs where during the initial sync the plugin needs to paginate over multiple responses and then switch into a "regular" synchronisation mode afterwards.

Validating data

Validating data structures coming through the plugin is completely optional because PDA API will accept any data that can be presented as a valid JSON object. However, having certain guarantees about the data can be very valuable when developing apps that use it and thus can increase the reach and impact of the plug. We strongly encourage the use of the validation process, especially, because it is very straightforward to implement.

We'll use FacebookFeedInterface example here. Any data coming through the interface is checked to meet the minimum structure requirements. It works by describing the expected data fields and their types as a FacebookPost case class and attempting to cast a given JSON object into it.

case class FacebookPost(
id: String,
caption: Option[String],
created_time: String,
description: Option[String],
link: Option[String],
message: Option[String],
name: Option[String],
object_id: Option[String],
place: Option[FacebookPlace],
picture: Option[String],
full_picture: Option[String],
status_type: Option[String],
story: Option[String],
`type`: String,
updated_time: String,
from: FacebookFrom,
privacy: FacebookPrivacy,
application: Option[FacebookApplication])

The interface contains the validateMinDataStructure method that implements the validation procedure:

def validateMinDataStructure(rawData: JsValue): Try[JsArray] = {
(rawData \ "data").toOption.map {
case data: JsArray if data.validate[List[FacebookPost]].isSuccess =>
logger.info(s"Validated JSON array of ${data.value.length} items.")
Success(data)
case data: JsArray =>
logger.warn(s"Could not validate full item list. Parsing ${data.value.length} data items one by one.")
Success(JsArray(data.value.filter(_.validate[FacebookPost].isSuccess)))
case data: JsObject =>
logger.error(s"Error validating data, some of the required fields missing:\n${data.toString}")
Failure(SourceDataProcessingException(s"Error validating data, some of the required fields missing."))
case data =>
logger.error(s"Error parsing JSON object: ${data.validate[List[FacebookPost]]}")
Failure(SourceDataProcessingException(s"Error parsing JSON object."))
}.getOrElse {
logger.error(s"Error parsing JSON object, necessary property not found: ${rawData.toString}")
Failure(SourceDataProcessingException(s"Error parsing JSON object, necessary property not found."))
}
}

Within the method, it is first checked if key "data" exists. In the affirmative case, it proceeds to validate the structure as one of the possible options. The strongest validation case simply tries to cast data as a List of FacebookPosts. If that fails, it tries to cast to generic list and filter out objects that do not conform to the FacebookPost data structure. If filtering fails too, the validation method returns Failure object which in turn prevents the data from being posted to the PDA. The logic can be seen in the overridden processResults method.

override protected def processResults(
content: JsValue,
hatAddress: String,
hatClientActor: ActorRef,
fetchParameters: ApiEndpointCall)(implicit ec: ExecutionContext, timeout: Timeout): Future[Unit] = {
for {
validatedData <- FutureTransformations.transform(validateMinDataStructure(content))
_ <- uploadHatData(namespace, endpoint, validatedData, hatAddress, hatClientActor)
} yield {
logger.debug(s"Successfully synced new records for HAT $hatAddress")
}
}

Essentially, the processResults method is only a wrapper for a "for" comprehension that executes a list of asynchronous methods. It can be modified to remove the validation step altogether or to include arbitrary number of additional operations as required. Examples on how dates are being reformatted and/or inserted into data sets can be found in FitbitProfileInterface and TwitterTweetsInterface interfaces.

Collecting Endpoint Variants

Endpoint Options Collectors are used by the core plugin for two purposes: (1) to check if the API resource is accessible at any given point in time and (2) to set up a group of API endpoint interfaces for synchronisation. In this section, we'll see how FacebookProfileCheck option collector is put together.

Option Collector class should extend the DataPlugOptionsCollector trait and implement it's abstract methods. As with API endpoint interfaces, namespace, endpoint and defaultApiEndpoint parameters need to be supplied as well. The main purpose of the get method is to construct a sequence of ApiEndpointVariantChoices which can then be used by the core plugin to run synchronisations. A very simple implementation would check if selected endpoint is reachable and, if true, return a static list of API endpoint interfaces wrapped in a ApiEndpointVariantChoice object.

def get(fetchParams: ApiEndpointCall, hatAddress: String, hatClientActor: ActorRef)(implicit ec: ExecutionContext): Future[Seq[ApiEndpointVariantChoice]] = {
authenticateRequest(fetchParams, hatAddress, refreshToken = false).flatMap { requestParams =>
logger.info("Facebook profile check authenticated")
buildRequest(requestParams).flatMap { response =>
response.status match {
case OK =>
logger.info(s"API endpoint FacebookProfile validated for $hatAddress")
Future.successful(staticEndpointChoices)
case _ =>
logger.warn(s"Could not validate FacebookProfile API endpoint $fetchParams — ${response.status}: ${response.body}")
Future.failed(SourceDataProcessingException("Could not validate FacebookProfile API endpoint"))
}
}
}.recover {
case e =>
logger.error(s"Failed to validate FacebookProfile API endpoint. Reason: ${e.getMessage}", e)
throw e
}
}

Of course, this method can be customised to dynamically retrieve and configure a list of API endpoints. One example of such advanced use case can be found in the Google Calendar plugin.

Putting It Together

Once the authentication provider, endpoint interfaces and option collectors are ready, there's only few steps left to get to a working plugin service:

  1. Configure the Play module

  2. Update database evolutions

  3. Update project's Build file configuration

Configuring Play Module

Within the Module file the plugin author can update bindings for the default core services and views. Usually, it should only be necessary when deeper customisation of a particular plug is desired.

It is also the place to configure providers for the newly created endpoint interfaces and option collectors. There is four providers that need to be set up:

1 — Endpoint interfaces should be provided as a sequence wrapped in a DataPlugRegistry object

@Provides
def provideDataPlugCollection(
facebookProfileInterface: FacebookProfileInterface,
facebookProfilePictureInterface: FacebookProfilePictureInterface,
facebookEventInterface: FacebookEventInterface,
facebookFeedInterface: FacebookFeedInterface): DataPlugRegistry = {
DataPlugRegistry(Seq(
facebookProfileInterface,
facebookProfilePictureInterface,
facebookEventInterface,
facebookFeedInterface))
}

2 — DataPlugOptionsCollectorRegistry should provide the option collectors coupled with an auth provider

@Provides
def provideDataPlugEndpointChoiceCollection(
facebookProvider: FacebookProvider,
facebookProfileCheck: FacebookProfileCheck): DataPlugOptionsCollectorRegistry = {
val variants: Seq[(Provider, DataPlugOptionsCollector)] = Seq((facebookProvider, facebookProfileCheck))
DataPlugOptionsCollectorRegistry(variants)
}

3 — SocialProviderRegistry should provide the authentication provider (default or custom built)

@Provides
def provideSocialProviderRegistry(
facebookProvider: FacebookProvider): SocialProviderRegistry = {
SocialProviderRegistry(Seq(
facebookProvider))
}

4 — And, finally, we need construct and provide the auth provider

@Provides
def provideFacebookProvider(
httpLayer: HTTPLayer,
stateProvider: OAuth2StateProvider,
configuration: Configuration): FacebookProvider = {
new FacebookProvider(httpLayer, stateProvider, configuration.underlying.as[OAuth2Settings]("silhouette.facebook"))
}

Updating Database Evolutions

Plugin-specific database evolutions can be found in conf/evolutions/dataplug.sql file. Here, we simply need to insert a record for each endpoint that should be available for synchronisation. The record should include the name, description, and any text-based additional details about the endpoint. Facebook example would include:

INSERT INTO dataplug_endpoint (name, description, details)
VALUES
('feed', 'User feed''s posts', 'sequence'),
('events', 'User''s events list', 'sequence'),
('profile', 'User''s Facebook profile information', 'snapshots'),
('profile/picture', 'User''s Facebook profile picture information', 'snapshots')
ON CONFLICT (name) DO NOTHING;

Updating Build File

As the final step, the plugin module should be added to the project list in the project/Build file. And that should be that, your plugin is ready! Try running it with the sbt "project {project-name}" "run -Dconfig.resource=application.dev.conf" command.

More detail coming soon about:

  • Implementing custom user login interface

  • Setting up endpoints for webhooks