In this example we will create a big query time series connector to export data to google big query engine.

Niagara Setup

  • Go to the workbench → Menu → Tools → New Module.



  • Then Next.
  • Leave the default and Finish.
  • Open the module on intellij idea.



  • Choose the project then OK.



  • Build the project.



  • You should see the modules added to your niagara modules folder.


Big Query Setup

  • Go to your google cloud console.



  • Create a new project.



  • Give the project a name then create.

  • Go to IAM & Admin, then service accounts.



  • Create service account.




  • Give your account a name then create.



  • Give it the big query admin role, then continue.



  • Then done.



  • Choose the account created.

  • Create a new key.

  • Choose JSON then create.



  • A json file will be downloaded.



  • Enable billing for the project.



  • Link Account.

Writing Code

  • Add the big query client library and btib dependencies.

    jar {
        from("src") {
            include "resource/**/*"
        }
    }
    
    dependencies {
        // Niagara
        compile "Tridium:nre:4.6"
        compile "Tridium:baja:4.6"
        compile "Tridium:alarm-rt:4.6"
        compile "Tridium:control-rt:4.6"
        // BTIB
        compile "BTIB:btibCore-rt:46"
        compile "BTIB:btibStructure-rt:46"
        compile "BTIB:btibStrategy-rt:46"
        compile "BTIB:btibDataFlow-rt:46"
        compile "BTIB:btibConnector-rt:46"
        // Big Query
        uberjar "com.google.cloud:google-cloud-bigquery:1.116.10"
    }
  • Add those permissions.

    <permissions>
      <niagara-permission-groups type="station">
        <req-permission>
          <name>NETWORK_COMMUNICATION</name>
          <purposeKey>Outside access for the connector</purposeKey>
          <parameters>
            <parameter name="hosts" value="*"/>
            <parameter name="ports" value="*"/>
            <parameter name="proxySelector" value="get"/>
          </parameters>
        </req-permission>
        <req-permission>
          <name>GET_ENVIRONMENT_VARIABLES</name>
          <purposeKey>The bigquery needs access to environment variables</purposeKey>
          <parameters>
            <parameter name="variables" value="*"/>
          </parameters>
        </req-permission>
        <req-permission>
          <name>LOAD_LIBRARIES</name>
          <purposeKey>The bigquery needs to load native libs for grpc calls</purposeKey>
          <parameters>
            <parameter name="libraries" value="*"/>
          </parameters>
        </req-permission>
        <req-permission>
          <name>MANAGE_EXECUTION</name>
          <purposeKey>The connector at the base of structure must handle its own threads.</purposeKey>
        </req-permission>
        <req-permission>
          <name>REFLECTION</name>
          <purposeKey>Used by bigquery SDK to register points and devices</purposeKey>
        </req-permission>
      </niagara-permission-groups>
    </permissions>
  • Add icons.


  • Implement the connector class.

    @NiagaraType
    @NiagaraProperty(
        name = "serviceAccountJson",
        type = "String",
        defaultValue = ""
    )
    public class BBigQueryTSConnector extends BTimeSeriesConnector
    {
    ...
    }
  • Add those fields

    public static final BtibLogger LOG = BtibLogger.getLogger(TYPE);
    public static final BIcon ICON = BtibIconTool.getComponentIcon(TYPE);
    
    public static final String DATASET = "dataset";
    public static final String TABLE = "table";
  • Add those helper methods.

    ////////////////////////////////////////////////////////////////
    // Utils
    ////////////////////////////////////////////////////////////////
    
    /**
     * Build the bigquery client
     *
     * @return
     * @throws IOException
     */
    private BigQuery getBigQueryClient() throws Exception
    {
        return this.doPrivileged(() -> {
            ByteArrayInputStream credentialsStream = new ByteArrayInputStream(this.getServiceAccountJson().getBytes());
            return BigQueryOptions.newBuilder()
                .setCredentials(GoogleCredentials.fromStream(credentialsStream))
                .build()
                .getService();
        });
    }
    
    /**
     * Writes data to bigquery
     *
     * @param bigQueryClient
     * @param data
     * @param dataset
     * @param table
     */
    private void writeData(BigQuery bigQueryClient, List<Map<String, Object>> data, String dataset, String table)
    {
        Table bqTable = bigQueryClient.getTable(dataset, table);
        Iterable<InsertAllRequest.RowToInsert> rows = data.stream().map(element -> convertToBQType(element)).map(InsertAllRequest.RowToInsert::of).collect(Collectors.toList());
        InsertAllResponse response = bqTable.insert(rows, true, true);
        response.getInsertErrors().forEach((key, value) -> System.out.println(value.stream().map(BigQueryError::getMessage).collect(Collectors.joining("\n"))));
    }
    
    /**
     * Converts types
     *
     * @param element
     * @return
     */
    private Map<String, Object> convertToBQType(Map<String, Object> element)
    {
        return element.entrySet().stream()
            .map(entry -> {
                if (entry.getValue() instanceof Number)
                {
                    Number value = (Number) entry.getValue();
                    return new AbstractMap.SimpleEntry<>(entry.getKey(), value.doubleValue());
                }
                if (entry.getValue() instanceof Date)
                {
                    Date value = (Date) entry.getValue();
                    return new AbstractMap.SimpleEntry<>(entry.getKey(), value.toString());
                }
                if (entry.getValue() == null)
                {
                    ;
                    return new AbstractMap.SimpleEntry<>(entry.getKey(), "null");
                }
                return entry;
            })
            .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
    }
    
    /**
     * Creates or update the table
     *
     * @param bigQueryClient
     * @param dataset
     * @param table
     * @param element
     */
    private void createOrUpdateTable(BigQuery bigQueryClient, String dataset, String table, Map<String, Object> element)
    {
        TableInfo tableInfo = this.buildTableInfo(dataset, table, element);
        Table bqTable = bigQueryClient.getTable(dataset, table);
        if (bqTable == null || !bqTable.exists())
        {
            bigQueryClient.create(tableInfo);
        }
        else
        {
            bigQueryClient.update(tableInfo);
        }
    }
    
    /**
     * Build the table
     *
     * @param dataset
     * @param table
     * @param element
     * @return
     */
    private TableInfo buildTableInfo(String dataset, String table, Map<String, Object> element)
    {
        List<Field> fields = element.entrySet().stream().map((entry) -> Field.of(entry.getKey(), this.getBigQueryType(entry.getValue()))).collect(Collectors.toList());
        Schema schema = Schema.of(fields);
        TableDefinition tableDefinition = StandardTableDefinition.of(schema);
        return TableInfo.newBuilder(TableId.of(dataset, table), tableDefinition).build();
    }
    
    /**
     * Create the dataset if not exists
     *
     * @param bigQueryClient
     * @param dataset
     */
    private void createDatasetIfNeeded(BigQuery bigQueryClient, String dataset)
    {
        Dataset bqDataset = bigQueryClient.getDataset(dataset);
        if (bqDataset == null)
        {
            bigQueryClient.create(DatasetInfo.newBuilder(dataset).build());
        }
    }
    
    /**
     * Map java types to bigquery types
     *
     * @param databaseSystem
     * @param value
     * @return
     */
    private StandardSQLTypeName getBigQueryType(Object value)
    {
        if (value instanceof Number)
        {
            return StandardSQLTypeName.FLOAT64;
        }
        if (value instanceof Boolean)
        {
            return StandardSQLTypeName.BOOL;
        }
        return StandardSQLTypeName.STRING;
    }
    
    /**
     * Run code in a privileged context with exception handling
     *
     * @param callable
     * @param <T>
     * @return
     * @throws Exception
     */
    protected <T> T doPrivileged(Callable<T> callable) throws Exception
    {
        AtomicReference<Exception> exception = new AtomicReference<>();
        T returnValue = AccessController.doPrivileged((PrivilegedAction<T>) () -> {
            try
            {
                return callable.call();
            }
            catch (Exception e)
            {
                exception.set(e);
            }
            return null;
        });
        if (exception.get() != null)
        {
            throw exception.get();
        }
        return returnValue;
    }
    
    
    
  • Implement the template methods.

    ////////////////////////////////////////////////////////////////
    // BExternalConnector
    ////////////////////////////////////////////////////////////////
    
    @Override
    public void doPing()
    {
        this.setLastAttempt(BAbsTime.now());
        try
        {
            BigQuery bigQuery = this.getBigQueryClient();
            bigQuery.listDatasets(BigQuery.DatasetListOption.all());
            this.setLastSuccess(BAbsTime.now());
            CompTool.setOk(this);
        }
        catch (Exception e)
        {
            e.printStackTrace();
            this.setLastFailure(BAbsTime.now());
            CompTool.setFault(this, e.getMessage(), e, LOG);
        }
    }
    
    ////////////////////////////////////////////////////////////////
    // BTimeSeriesConnector
    ////////////////////////////////////////////////////////////////
    
    @Override
    protected void export_(List<Map<String, Object>> data,Map<String, Object> tags, Map<String, String> options) throws TimeSeriesConnectorException
    {
    	data.forEach(e -> e.putAll(tags));
        String dataset = options.get(DATASET);
        if (dataset == null || dataset.isEmpty())
        {
            throw new TimeSeriesConnectorException("Dataset is null or empty");
        }
    
        String table = options.get(DATASET);
        if (table == null || table.isEmpty())
        {
            throw new TimeSeriesConnectorException("Table is null or empty");
        }
    
        if (data.isEmpty())
        {
            return;
        }
    
        try
        {
            BigQuery bigQueryClient = this.getBigQueryClient();
            this.doPrivileged(() -> {
                this.createDatasetIfNeeded(bigQueryClient, dataset);
                this.createOrUpdateTable(bigQueryClient, dataset, table, data.get(0));
                this.writeData(bigQueryClient, data, dataset, table);
                return null;
            });
        }
        catch (Exception e)
        {
            LOG.log(Level.SEVERE, e.getMessage(), e);
            throw new TimeSeriesConnectorException(e);
        }
    }
    
    @Override
    protected void reset_(Map<String, String> options) throws TimeSeriesConnectorException
    {
        String dataset = options.get(DATASET);
        if (dataset == null || dataset.isEmpty())
        {
            throw new TimeSeriesConnectorException("Dataset is null or empty");
        }
    
        String table = options.get(DATASET);
        if (table == null || table.isEmpty())
        {
            throw new TimeSeriesConnectorException("Table is null or empty");
        }
    
        try
        {
            this.doPrivileged(() -> {
                BigQuery bigQueryClient = this.getBigQueryClient();
                Table bqTable = bigQueryClient.getTable(dataset, table);
                if (bqTable != null)
                {
                    bqTable.delete();
                }
                return null;
            });
        }
        catch (Exception e)
        {
            throw new TimeSeriesConnectorException(e);
        }
    }
    
    @Override
    public BtibLogger getBtibLogger()
    {
        return LOG;
    }
  • And the icon.

    @Override
    public BIcon getIcon()
    {
        return ICON;
    }
  • Implement the export operation.

    public class BIgQueryExportOperation extends TimeSeriesExportOperation
    {
        private final Optional<BTimeSeriesConnector> connector;
        private final String dataset;
        private final BSFormat table;
        ////////////////////////////////////////////////////////////////
        // Constructors
        ////////////////////////////////////////////////////////////////
    
        /**
         * Constructor
         *
         * @param block
         * @param strategyExecution
         * @param log
         * @param context
         * @param connector
         * @param id
         * @param memoryAccess
         * @param artifacts
         * @param dataset
         * @param table
         */
        public BIgQueryExportOperation(BBigQueryExportBlock block, StrategyExecution strategyExecution, BLog log, Context context, Optional<BTimeSeriesConnector> connector, String id, MemoryAccess memoryAccess, BArtifacts artifacts, String dataset, BSFormat table)
        {
            super(id, memoryAccess, artifacts, block, log, context);
            this.connector = connector;
            this.dataset = dataset;
            this.table = table;
        }
    
        ////////////////////////////////////////////////////////////////
        // TimeSeriesExportOperation
        ////////////////////////////////////////////////////////////////
    
        @Override
        protected Optional<BTimeSeriesConnector> getTimeSeriesConnector()
        {
            return this.connector;
        }
    
        @Override
        protected Map<String, String> getConnectorExportOptions(BSourceTable sourceTable) throws TimeSeriesConnectorException
        {
            Map<String, String> options = new HashMap<>();
            options.put(BBigQueryTSConnector.DATASET, this.dataset);
            options.put(BBigQueryTSConnector.TABLE, this.getDestination(sourceTable));
            return options;
        }
    
        @Override
        protected String getDestination(BSourceTable sourceTable) throws TimeSeriesConnectorException
        {
            // Params
            Map<String, Object> params = new LinkedHashMap<>();
            params.put("origin", sourceTable);
            params.put("base", sourceTable);
            Object tableName = this.table.resolve(params, this.log, this.cx);
    
            if (tableName == null || tableName.toString().isEmpty())
            {
                throw new TimeSeriesConnectorException("Table name is null or empty");
            }
    
            return tableName.toString();
        }
    }
    
    
    
  • Then the FE in the wb module.

    @NiagaraType
    public class BBigQueryTSConnectorFE extends BTimeSeriesConnectorFE
    {
        /*+ ------------ BEGIN BAJA AUTO GENERATED CODE ------------ +*/
        /*@ $BBigQueryTSConnectorFE(2979906276)1.0$ @*/
        /* Generated Tue Aug 25 17:16:43 CEST 2020 by Slot-o-Matic (c) Tridium, Inc. 2012 */
    
        ////////////////////////////////////////////////////////////////
        // Type
        ////////////////////////////////////////////////////////////////
    
        @Override
        public Type getType()
        {
            return TYPE;
        }
    
        public static final Type TYPE = Sys.loadType(BBigQueryTSConnectorFE.class);
    
        /*+ ------------ END BAJA AUTO GENERATED CODE -------------- +*/
    
        @Override
        public boolean isConnectorValid(BExternalConnector connector)
        {
            return super.isConnectorValid(connector) && connector instanceof BBigQueryTSConnector;
        }
    }
  • Now the export block

    @NiagaraType
    @NiagaraProperty(
        name = "bigQueryConnector",
        type = "String",
        defaultValue = "",
        facets = @Facet(name = "BFacets.FIELD_EDITOR", value = "\"btibBigQuery:BigQueryTSConnectorFE\"")
    )
    @NiagaraProperty(
        name = "dataset",
        type = "String",
        defaultValue = "niagara"
    )
    @NiagaraProperty(
        name = "table",
        type = "BSFormat",
        defaultValue = "BSFormat.make(\"export\")"
    )
    public class BBigQueryExportBlock extends BTimeSeriesBlock
    {
        /*+ ------------ BEGIN BAJA AUTO GENERATED CODE ------------ +*/
        /*@ $BBigQueryExportBlock(3664760139)1.0$ @*/
        /* Generated Tue Aug 25 17:33:51 CEST 2020 by Slot-o-Matic (c) Tridium, Inc. 2012 */
    
        ////////////////////////////////////////////////////////////////
        // Property "bigQueryConnector"
        ////////////////////////////////////////////////////////////////
    
        /**
         * Slot for the {@code bigQueryConnector} property.
         *
         * @see #getBigQueryConnector
         * @see #setBigQueryConnector
         */
        public static final Property bigQueryConnector = newProperty(0, "", BFacets.make(BFacets.FIELD_EDITOR, "bigquery:BigQueryTSConnectorFE"));
    
        /**
         * Get the {@code bigQueryConnector} property.
         *
         * @see #bigQueryConnector
         */
        public String getBigQueryConnector()
        {
            return this.getString(bigQueryConnector);
        }
    
        /**
         * Set the {@code bigQueryConnector} property.
         *
         * @see #bigQueryConnector
         */
        public void setBigQueryConnector(String v)
        {
            this.setString(bigQueryConnector, v, null);
        }
    
        ////////////////////////////////////////////////////////////////
        // Property "dataset"
        ////////////////////////////////////////////////////////////////
    
        /**
         * Slot for the {@code dataset} property.
         *
         * @see #getDataset
         * @see #setDataset
         */
        public static final Property dataset = newProperty(0, "niagara", null);
    
        /**
         * Get the {@code dataset} property.
         *
         * @see #dataset
         */
        public String getDataset()
        {
            return this.getString(dataset);
        }
    
        /**
         * Set the {@code dataset} property.
         *
         * @see #dataset
         */
        public void setDataset(String v)
        {
            this.setString(dataset, v, null);
        }
    
        ////////////////////////////////////////////////////////////////
        // Property "table"
        ////////////////////////////////////////////////////////////////
    
        /**
         * Slot for the {@code table} property.
         *
         * @see #getTable
         * @see #setTable
         */
        public static final Property table = newProperty(0, BSFormat.make("export"), null);
    
        /**
         * Get the {@code table} property.
         *
         * @see #table
         */
        public BSFormat getTable()
        {
            return (BSFormat) this.get(table);
        }
    
        /**
         * Set the {@code table} property.
         *
         * @see #table
         */
        public void setTable(BSFormat v)
        {
            this.set(table, v, null);
        }
    
        ////////////////////////////////////////////////////////////////
        // Type
        ////////////////////////////////////////////////////////////////
    
        @Override
        public Type getType()
        {
            return TYPE;
        }
    
        public static final Type TYPE = Sys.loadType(BBigQueryExportBlock.class);
    
        /*+ ------------ END BAJA AUTO GENERATED CODE -------------- +*/
    
        public static final BIcon ICON = BtibIconTool.getComponentIcon(TYPE);
    
        ////////////////////////////////////////////////////////////////
        // BTimeSeriesBlock
        ////////////////////////////////////////////////////////////////
    
    
        @Override
        protected String getConnectorHandle()
        {
            return this.getBigQueryConnector();
        }
    
        @Override
        protected Map<String, String> getConnectorResetOptions(String destination)
        {
            Map<String, String> options = new HashMap<>();
            options.put(BBigQueryTSConnector.DATASET, this.getDataset());
            options.put(BBigQueryTSConnector.TABLE, destination);
            return options;
        }
    
        @Override
        protected Operation makeOperation_(StrategyExecution strategyExecution, BLog log, Context context, Optional<BTimeSeriesConnector> connector, String id, MemoryAccess memoryAccess, BArtifacts artifacts)
        {
            return new BIgQueryExportOperation(this , strategyExecution, log, context, connector, id, memoryAccess, artifacts, this.getDataset(), this.getTable());
        }
    
        ////////////////////////////////////////////////////////////////
        // Getters / Setters
        ////////////////////////////////////////////////////////////////
    
        @Override
        public BIcon getIcon()
        {
            return ICON;
        }
    }
  • Finally add the connector and the exporter to the palette.

    <?xml version="1.0" encoding="UTF-8"?>
    <bajaObjectGraph version="4.0" reversibleEncodingKeySource="none" FIPSEnabled="false"
                     reversibleEncodingValidator="[null.1]=">
        <p m="b=baja" t="b:UnrestrictedFolder">
            <!-- Connectors -->
            <p m="bbq=btibBigQuery" n="BigQueryTSConnector" t="bbq:BigQueryTSConnector"/>
            <!-- Blocks -->
            <p m="b=baja" n="Block" t="b:UnrestrictedFolder">
                <p n="BigQueryExportBlock" t="bbq:BigQueryExportBlock"/>
            </p>
    
        </p>
    </bajaObjectGraph>
  • Now build and sign the module.

    -> ./gradlew.bat assemble
    -> ./scripts/selfSign/selfSign.bat
  • Start a station and add the connector to the time series connectors folder.



  • Add the account service json data.

  • Create an export strategy and add the exporter.



  • Trigger the export.
  • You should see your data exported in big query.


Full source code https://github.com/VayanData/btibBigQuery