In this example we will create a big query time series connector to export data to google big query engine.
Niagara Setup
- Go to the workbench → Menu → Tools → New Module.
- Then Next.
- Leave the default and Finish.
- Open the module on intellij idea.
- Choose the project then OK.
- Build the project.
- You should see the modules added to your niagara modules folder.
Big Query Setup
- Go to your google cloud console.
- Create a new project.
- Give the project a name then create.
- Go to IAM & Admin, then service accounts.
- Create service account.
- Give your account a name then create.
- Give it the big query admin role, then continue.
- Then done.
- Choose the account created.
- Create a new key.
- Choose JSON then create.
- A json file will be downloaded.
- Enable billing for the project.
- Link Account.
Writing Code
Add the big query client library and btib dependencies.
jar { from("src") { include "resource/**/*" } } dependencies { // Niagara compile "Tridium:nre:4.6" compile "Tridium:baja:4.6" compile "Tridium:alarm-rt:4.6" compile "Tridium:control-rt:4.6" // BTIB compile "BTIB:btibCore-rt:46" compile "BTIB:btibStructure-rt:46" compile "BTIB:btibStrategy-rt:46" compile "BTIB:btibDataFlow-rt:46" compile "BTIB:btibConnector-rt:46" // Big Query uberjar "com.google.cloud:google-cloud-bigquery:1.116.10" }
Add those permissions.
<permissions> <niagara-permission-groups type="station"> <req-permission> <name>NETWORK_COMMUNICATION</name> <purposeKey>Outside access for the connector</purposeKey> <parameters> <parameter name="hosts" value="*"/> <parameter name="ports" value="*"/> <parameter name="proxySelector" value="get"/> </parameters> </req-permission> <req-permission> <name>GET_ENVIRONMENT_VARIABLES</name> <purposeKey>The bigquery needs access to environment variables</purposeKey> <parameters> <parameter name="variables" value="*"/> </parameters> </req-permission> <req-permission> <name>LOAD_LIBRARIES</name> <purposeKey>The bigquery needs to load native libs for grpc calls</purposeKey> <parameters> <parameter name="libraries" value="*"/> </parameters> </req-permission> <req-permission> <name>MANAGE_EXECUTION</name> <purposeKey>The connector at the base of structure must handle its own threads.</purposeKey> </req-permission> <req-permission> <name>REFLECTION</name> <purposeKey>Used by bigquery SDK to register points and devices</purposeKey> </req-permission> </niagara-permission-groups> </permissions>
Add icons.
Implement the connector class.
@NiagaraType @NiagaraProperty( name = "serviceAccountJson", type = "String", defaultValue = "" ) public class BBigQueryTSConnector extends BTimeSeriesConnector { ... }
Add those fields
public static final BtibLogger LOG = BtibLogger.getLogger(TYPE); public static final BIcon ICON = BtibIconTool.getComponentIcon(TYPE); public static final String DATASET = "dataset"; public static final String TABLE = "table";
Add those helper methods.
//////////////////////////////////////////////////////////////// // Utils //////////////////////////////////////////////////////////////// /** * Build the bigquery client * * @return * @throws IOException */ private BigQuery getBigQueryClient() throws Exception { return this.doPrivileged(() -> { ByteArrayInputStream credentialsStream = new ByteArrayInputStream(this.getServiceAccountJson().getBytes()); return BigQueryOptions.newBuilder() .setCredentials(GoogleCredentials.fromStream(credentialsStream)) .build() .getService(); }); } /** * Writes data to bigquery * * @param bigQueryClient * @param data * @param dataset * @param table */ private void writeData(BigQuery bigQueryClient, List<Map<String, Object>> data, String dataset, String table) { Table bqTable = bigQueryClient.getTable(dataset, table); Iterable<InsertAllRequest.RowToInsert> rows = data.stream().map(element -> convertToBQType(element)).map(InsertAllRequest.RowToInsert::of).collect(Collectors.toList()); InsertAllResponse response = bqTable.insert(rows, true, true); response.getInsertErrors().forEach((key, value) -> System.out.println(value.stream().map(BigQueryError::getMessage).collect(Collectors.joining("\n")))); } /** * Converts types * * @param element * @return */ private Map<String, Object> convertToBQType(Map<String, Object> element) { return element.entrySet().stream() .map(entry -> { if (entry.getValue() instanceof Number) { Number value = (Number) entry.getValue(); return new AbstractMap.SimpleEntry<>(entry.getKey(), value.doubleValue()); } if (entry.getValue() instanceof Date) { Date value = (Date) entry.getValue(); return new AbstractMap.SimpleEntry<>(entry.getKey(), value.toString()); } if (entry.getValue() == null) { ; return new AbstractMap.SimpleEntry<>(entry.getKey(), "null"); } return entry; }) .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); } /** * Creates or update the table * * @param bigQueryClient * @param dataset * @param table * @param element */ private void createOrUpdateTable(BigQuery bigQueryClient, String dataset, String table, Map<String, Object> element) { TableInfo tableInfo = this.buildTableInfo(dataset, table, element); Table bqTable = bigQueryClient.getTable(dataset, table); if (bqTable == null || !bqTable.exists()) { bigQueryClient.create(tableInfo); } else { bigQueryClient.update(tableInfo); } } /** * Build the table * * @param dataset * @param table * @param element * @return */ private TableInfo buildTableInfo(String dataset, String table, Map<String, Object> element) { List<Field> fields = element.entrySet().stream().map((entry) -> Field.of(entry.getKey(), this.getBigQueryType(entry.getValue()))).collect(Collectors.toList()); Schema schema = Schema.of(fields); TableDefinition tableDefinition = StandardTableDefinition.of(schema); return TableInfo.newBuilder(TableId.of(dataset, table), tableDefinition).build(); } /** * Create the dataset if not exists * * @param bigQueryClient * @param dataset */ private void createDatasetIfNeeded(BigQuery bigQueryClient, String dataset) { Dataset bqDataset = bigQueryClient.getDataset(dataset); if (bqDataset == null) { bigQueryClient.create(DatasetInfo.newBuilder(dataset).build()); } } /** * Map java types to bigquery types * * @param databaseSystem * @param value * @return */ private StandardSQLTypeName getBigQueryType(Object value) { if (value instanceof Number) { return StandardSQLTypeName.FLOAT64; } if (value instanceof Boolean) { return StandardSQLTypeName.BOOL; } return StandardSQLTypeName.STRING; } /** * Run code in a privileged context with exception handling * * @param callable * @param <T> * @return * @throws Exception */ protected <T> T doPrivileged(Callable<T> callable) throws Exception { AtomicReference<Exception> exception = new AtomicReference<>(); T returnValue = AccessController.doPrivileged((PrivilegedAction<T>) () -> { try { return callable.call(); } catch (Exception e) { exception.set(e); } return null; }); if (exception.get() != null) { throw exception.get(); } return returnValue; }
Implement the template methods.
//////////////////////////////////////////////////////////////// // BExternalConnector //////////////////////////////////////////////////////////////// @Override public void doPing() { this.setLastAttempt(BAbsTime.now()); try { BigQuery bigQuery = this.getBigQueryClient(); bigQuery.listDatasets(BigQuery.DatasetListOption.all()); this.setLastSuccess(BAbsTime.now()); CompTool.setOk(this); } catch (Exception e) { e.printStackTrace(); this.setLastFailure(BAbsTime.now()); CompTool.setFault(this, e.getMessage(), e, LOG); } } //////////////////////////////////////////////////////////////// // BTimeSeriesConnector //////////////////////////////////////////////////////////////// @Override protected void export_(List<Map<String, Object>> data,Map<String, Object> tags, Map<String, String> options) throws TimeSeriesConnectorException { data.forEach(e -> e.putAll(tags)); String dataset = options.get(DATASET); if (dataset == null || dataset.isEmpty()) { throw new TimeSeriesConnectorException("Dataset is null or empty"); } String table = options.get(DATASET); if (table == null || table.isEmpty()) { throw new TimeSeriesConnectorException("Table is null or empty"); } if (data.isEmpty()) { return; } try { BigQuery bigQueryClient = this.getBigQueryClient(); this.doPrivileged(() -> { this.createDatasetIfNeeded(bigQueryClient, dataset); this.createOrUpdateTable(bigQueryClient, dataset, table, data.get(0)); this.writeData(bigQueryClient, data, dataset, table); return null; }); } catch (Exception e) { LOG.log(Level.SEVERE, e.getMessage(), e); throw new TimeSeriesConnectorException(e); } } @Override protected void reset_(Map<String, String> options) throws TimeSeriesConnectorException { String dataset = options.get(DATASET); if (dataset == null || dataset.isEmpty()) { throw new TimeSeriesConnectorException("Dataset is null or empty"); } String table = options.get(DATASET); if (table == null || table.isEmpty()) { throw new TimeSeriesConnectorException("Table is null or empty"); } try { this.doPrivileged(() -> { BigQuery bigQueryClient = this.getBigQueryClient(); Table bqTable = bigQueryClient.getTable(dataset, table); if (bqTable != null) { bqTable.delete(); } return null; }); } catch (Exception e) { throw new TimeSeriesConnectorException(e); } } @Override public BtibLogger getBtibLogger() { return LOG; }
And the icon.
@Override public BIcon getIcon() { return ICON; }
Implement the export operation.
public class BIgQueryExportOperation extends TimeSeriesExportOperation { private final Optional<BTimeSeriesConnector> connector; private final String dataset; private final BSFormat table; //////////////////////////////////////////////////////////////// // Constructors //////////////////////////////////////////////////////////////// /** * Constructor * * @param block * @param strategyExecution * @param log * @param context * @param connector * @param id * @param memoryAccess * @param artifacts * @param dataset * @param table */ public BIgQueryExportOperation(BBigQueryExportBlock block, StrategyExecution strategyExecution, BLog log, Context context, Optional<BTimeSeriesConnector> connector, String id, MemoryAccess memoryAccess, BArtifacts artifacts, String dataset, BSFormat table) { super(id, memoryAccess, artifacts, block, log, context); this.connector = connector; this.dataset = dataset; this.table = table; } //////////////////////////////////////////////////////////////// // TimeSeriesExportOperation //////////////////////////////////////////////////////////////// @Override protected Optional<BTimeSeriesConnector> getTimeSeriesConnector() { return this.connector; } @Override protected Map<String, String> getConnectorExportOptions(BSourceTable sourceTable) throws TimeSeriesConnectorException { Map<String, String> options = new HashMap<>(); options.put(BBigQueryTSConnector.DATASET, this.dataset); options.put(BBigQueryTSConnector.TABLE, this.getDestination(sourceTable)); return options; } @Override protected String getDestination(BSourceTable sourceTable) throws TimeSeriesConnectorException { // Params Map<String, Object> params = new LinkedHashMap<>(); params.put("origin", sourceTable); params.put("base", sourceTable); Object tableName = this.table.resolve(params, this.log, this.cx); if (tableName == null || tableName.toString().isEmpty()) { throw new TimeSeriesConnectorException("Table name is null or empty"); } return tableName.toString(); } }
Then the FE in the wb module.
@NiagaraType public class BBigQueryTSConnectorFE extends BTimeSeriesConnectorFE { /*+ ------------ BEGIN BAJA AUTO GENERATED CODE ------------ +*/ /*@ $BBigQueryTSConnectorFE(2979906276)1.0$ @*/ /* Generated Tue Aug 25 17:16:43 CEST 2020 by Slot-o-Matic (c) Tridium, Inc. 2012 */ //////////////////////////////////////////////////////////////// // Type //////////////////////////////////////////////////////////////// @Override public Type getType() { return TYPE; } public static final Type TYPE = Sys.loadType(BBigQueryTSConnectorFE.class); /*+ ------------ END BAJA AUTO GENERATED CODE -------------- +*/ @Override public boolean isConnectorValid(BExternalConnector connector) { return super.isConnectorValid(connector) && connector instanceof BBigQueryTSConnector; } }
Now the export block
@NiagaraType @NiagaraProperty( name = "bigQueryConnector", type = "String", defaultValue = "", facets = @Facet(name = "BFacets.FIELD_EDITOR", value = "\"btibBigQuery:BigQueryTSConnectorFE\"") ) @NiagaraProperty( name = "dataset", type = "String", defaultValue = "niagara" ) @NiagaraProperty( name = "table", type = "BSFormat", defaultValue = "BSFormat.make(\"export\")" ) public class BBigQueryExportBlock extends BTimeSeriesBlock { /*+ ------------ BEGIN BAJA AUTO GENERATED CODE ------------ +*/ /*@ $BBigQueryExportBlock(3664760139)1.0$ @*/ /* Generated Tue Aug 25 17:33:51 CEST 2020 by Slot-o-Matic (c) Tridium, Inc. 2012 */ //////////////////////////////////////////////////////////////// // Property "bigQueryConnector" //////////////////////////////////////////////////////////////// /** * Slot for the {@code bigQueryConnector} property. * * @see #getBigQueryConnector * @see #setBigQueryConnector */ public static final Property bigQueryConnector = newProperty(0, "", BFacets.make(BFacets.FIELD_EDITOR, "bigquery:BigQueryTSConnectorFE")); /** * Get the {@code bigQueryConnector} property. * * @see #bigQueryConnector */ public String getBigQueryConnector() { return this.getString(bigQueryConnector); } /** * Set the {@code bigQueryConnector} property. * * @see #bigQueryConnector */ public void setBigQueryConnector(String v) { this.setString(bigQueryConnector, v, null); } //////////////////////////////////////////////////////////////// // Property "dataset" //////////////////////////////////////////////////////////////// /** * Slot for the {@code dataset} property. * * @see #getDataset * @see #setDataset */ public static final Property dataset = newProperty(0, "niagara", null); /** * Get the {@code dataset} property. * * @see #dataset */ public String getDataset() { return this.getString(dataset); } /** * Set the {@code dataset} property. * * @see #dataset */ public void setDataset(String v) { this.setString(dataset, v, null); } //////////////////////////////////////////////////////////////// // Property "table" //////////////////////////////////////////////////////////////// /** * Slot for the {@code table} property. * * @see #getTable * @see #setTable */ public static final Property table = newProperty(0, BSFormat.make("export"), null); /** * Get the {@code table} property. * * @see #table */ public BSFormat getTable() { return (BSFormat) this.get(table); } /** * Set the {@code table} property. * * @see #table */ public void setTable(BSFormat v) { this.set(table, v, null); } //////////////////////////////////////////////////////////////// // Type //////////////////////////////////////////////////////////////// @Override public Type getType() { return TYPE; } public static final Type TYPE = Sys.loadType(BBigQueryExportBlock.class); /*+ ------------ END BAJA AUTO GENERATED CODE -------------- +*/ public static final BIcon ICON = BtibIconTool.getComponentIcon(TYPE); //////////////////////////////////////////////////////////////// // BTimeSeriesBlock //////////////////////////////////////////////////////////////// @Override protected String getConnectorHandle() { return this.getBigQueryConnector(); } @Override protected Map<String, String> getConnectorResetOptions(String destination) { Map<String, String> options = new HashMap<>(); options.put(BBigQueryTSConnector.DATASET, this.getDataset()); options.put(BBigQueryTSConnector.TABLE, destination); return options; } @Override protected Operation makeOperation_(StrategyExecution strategyExecution, BLog log, Context context, Optional<BTimeSeriesConnector> connector, String id, MemoryAccess memoryAccess, BArtifacts artifacts) { return new BIgQueryExportOperation(this , strategyExecution, log, context, connector, id, memoryAccess, artifacts, this.getDataset(), this.getTable()); } //////////////////////////////////////////////////////////////// // Getters / Setters //////////////////////////////////////////////////////////////// @Override public BIcon getIcon() { return ICON; } }
Finally add the connector and the exporter to the palette.
<?xml version="1.0" encoding="UTF-8"?> <bajaObjectGraph version="4.0" reversibleEncodingKeySource="none" FIPSEnabled="false" reversibleEncodingValidator="[null.1]="> <p m="b=baja" t="b:UnrestrictedFolder"> <!-- Connectors --> <p m="bbq=btibBigQuery" n="BigQueryTSConnector" t="bbq:BigQueryTSConnector"/> <!-- Blocks --> <p m="b=baja" n="Block" t="b:UnrestrictedFolder"> <p n="BigQueryExportBlock" t="bbq:BigQueryExportBlock"/> </p> </p> </bajaObjectGraph>
Now build and sign the module.
-> ./gradlew.bat assemble -> ./scripts/selfSign/selfSign.bat
- Start a station and add the connector to the time series connectors folder.
- Add the account service json data.
- Create an export strategy and add the exporter.
- Trigger the export.
- You should see your data exported in big query.
Full source code https://github.com/VayanData/btibBigQuery