|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
const { Pool } = require('pg'); |
|
|
require('dotenv').config(); |
|
|
|
|
|
const pool = new Pool({ connectionString: process.env.DATABASE_URL }); |
|
|
|
|
|
module.exports = { |
|
|
query: (text, params) => pool.query(text, params), |
|
|
pool |
|
|
}; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
const { RedisPubSub } = require('graphql-redis-subscriptions'); |
|
|
const Redis = require('ioredis'); |
|
|
require('dotenv').config(); |
|
|
|
|
|
const options = { |
|
|
retryStrategy: times => Math.min(times * 50, 2000), |
|
|
}; |
|
|
|
|
|
const pubsub = new RedisPubSub({ |
|
|
publisher: new Redis(process.env.REDIS_URL, options), |
|
|
subscriber: new Redis(process.env.REDIS_URL, options), |
|
|
}); |
|
|
|
|
|
module.exports = pubsub; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
const { gql } = require('apollo-server'); |
|
|
|
|
|
const typeDefs = gql` |
|
|
scalar DateTime |
|
|
scalar JSON |
|
|
|
|
|
enum Namespace { infrastructure satellite } |
|
|
|
|
|
type Location { lat: Float lon: Float } |
|
|
|
|
|
type Provenance { source: String! license: String retrieved_at: DateTime! } |
|
|
|
|
|
type InfrastructureFault { |
|
|
id: ID! |
|
|
namespace: Namespace! |
|
|
type: String! |
|
|
timestamp: DateTime! |
|
|
location: JSON |
|
|
severity: Int! |
|
|
confirmed: Boolean! |
|
|
images: [String] |
|
|
provenance: Provenance |
|
|
} |
|
|
|
|
|
type Payout { |
|
|
id: ID! |
|
|
faultId: ID! |
|
|
amountMinorUnits: Int! |
|
|
currency: String! |
|
|
payeeId: String! |
|
|
status: String! |
|
|
createdAt: DateTime! |
|
|
settledAt: DateTime |
|
|
txRef: String |
|
|
} |
|
|
|
|
|
input IngestFaultInput { |
|
|
namespace: Namespace! |
|
|
type: String! |
|
|
timestamp: DateTime! |
|
|
location: JSON |
|
|
severity: Int! |
|
|
images: [String] |
|
|
provenance: JSON |
|
|
} |
|
|
|
|
|
input CreatePayoutInput { |
|
|
faultId: ID! |
|
|
amountMinorUnits: Int! |
|
|
currency: String! |
|
|
payeeId: String! |
|
|
} |
|
|
|
|
|
type Query { |
|
|
listInfraFaults(limit: Int = 50, offset: Int = 0): [InfrastructureFault!] |
|
|
payoutsForFault(faultId: ID!): [Payout!] |
|
|
} |
|
|
|
|
|
type Mutation { |
|
|
ingestFault(input: IngestFaultInput!): InfrastructureFault! |
|
|
confirmFault(id: ID!, confirmed: Boolean!): InfrastructureFault! |
|
|
createPayout(input: CreatePayoutInput!): Payout! |
|
|
settlePayout(payoutId: ID!): Payout! |
|
|
} |
|
|
|
|
|
type Subscription { |
|
|
faultCreated: InfrastructureFault! |
|
|
faultConfirmed: InfrastructureFault! |
|
|
payoutUpdated: Payout! |
|
|
} |
|
|
`; |
|
|
|
|
|
module.exports = typeDefs; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
const { GraphQLScalarType, Kind } = require('graphql'); |
|
|
const db = require('./db'); |
|
|
const pubsub = require('./pubsub'); |
|
|
const { v4: uuidv4 } = require('uuid'); |
|
|
|
|
|
const FAULT_CREATED = 'FAULT_CREATED'; |
|
|
const FAULT_CONFIRMED = 'FAULT_CONFIRMED'; |
|
|
const PAYOUT_UPDATED = 'PAYOUT_UPDATED'; |
|
|
|
|
|
const DateTime = new GraphQLScalarType({ |
|
|
name: 'DateTime', |
|
|
description: 'ISO date-time scalar', |
|
|
parseValue: value => new Date(value), |
|
|
serialize: value => value instanceof Date ? value.toISOString() : new Date(value).toISOString(), |
|
|
parseLiteral(ast) { |
|
|
if (ast.kind === Kind.STRING) return new Date(ast.value); |
|
|
return null; |
|
|
} |
|
|
}); |
|
|
|
|
|
const JSONScalar = new GraphQLScalarType({ |
|
|
name: 'JSON', |
|
|
description: 'Arbitrary JSON value', |
|
|
parseValue: value => value, |
|
|
serialize: value => value, |
|
|
parseLiteral(ast) { |
|
|
switch (ast.kind) { |
|
|
case Kind.STRING: return ast.value; |
|
|
case Kind.INT: return parseInt(ast.value, 10); |
|
|
case Kind.FLOAT: return parseFloat(ast.value); |
|
|
case Kind.BOOLEAN: return ast.value === 'true'; |
|
|
default: return null; |
|
|
} |
|
|
} |
|
|
}); |
|
|
|
|
|
const resolvers = { |
|
|
DateTime, |
|
|
JSON: JSONScalar, |
|
|
|
|
|
Query: { |
|
|
listInfraFaults: async (_, { limit, offset }) => { |
|
|
const r = await db.query('SELECT * FROM objects WHERE type=$1 ORDER BY created_at DESC LIMIT $2 OFFSET $3', ['pothole-detection', limit, offset]); |
|
|
return r.rows.map(r => ({ |
|
|
id: r.id, |
|
|
namespace: r.namespace, |
|
|
type: r.type, |
|
|
timestamp: r.timestamp, |
|
|
location: r.location, |
|
|
severity: r.severity, |
|
|
confirmed: r.confirmed, |
|
|
images: r.images, |
|
|
provenance: r.provenance |
|
|
})); |
|
|
}, |
|
|
payoutsForFault: async (_, { faultId }) => { |
|
|
const r = await db.query('SELECT * FROM payouts WHERE fault_id=$1 ORDER BY created_at DESC', [faultId]); |
|
|
return r.rows.map(p => ({ |
|
|
id: p.id, |
|
|
faultId: p.fault_id, |
|
|
amountMinorUnits: parseInt(p.amount_minor_units, 10), |
|
|
currency: p.currency, |
|
|
payeeId: p.payee_id, |
|
|
status: p.status, |
|
|
createdAt: p.created_at, |
|
|
settledAt: p.settled_at, |
|
|
txRef: p.tx_ref |
|
|
})); |
|
|
} |
|
|
}, |
|
|
|
|
|
Mutation: { |
|
|
ingestFault: async (_, { input }) => { |
|
|
const id = uuidv4(); |
|
|
const q = `INSERT INTO objects (id, namespace, type, timestamp, location, severity, images, provenance) |
|
|
VALUES ($1,$2,$3,$4,$5,$6,$7,$8) RETURNING *`; |
|
|
const vals = [id, input.namespace, input.type, input.timestamp || new Date().toISOString(), JSON.stringify(input.location || null), input.severity, JSON.stringify(input.images || []), input.provenance || {}]; |
|
|
const r = await db.query(q, vals); |
|
|
const obj = r.rows[0]; |
|
|
const payload = { |
|
|
id: obj.id, |
|
|
namespace: obj.namespace, |
|
|
type: obj.type, |
|
|
timestamp: obj.timestamp, |
|
|
location: obj.location, |
|
|
severity: obj.severity, |
|
|
confirmed: obj.confirmed, |
|
|
images: obj.images, |
|
|
provenance: obj.provenance |
|
|
}; |
|
|
await pubsub.publish(FAULT_CREATED, payload); |
|
|
return payload; |
|
|
}, |
|
|
|
|
|
confirmFault: async (_, { id, confirmed }) => { |
|
|
const q = 'UPDATE objects SET confirmed=$1 WHERE id=$2 RETURNING *'; |
|
|
const r = await db.query(q, [confirmed, id]); |
|
|
if (r.rowCount === 0) throw new Error('fault not found'); |
|
|
const obj = r.rows[0]; |
|
|
const payload = { |
|
|
id: obj.id, |
|
|
namespace: obj.namespace, |
|
|
type: obj.type, |
|
|
timestamp: obj.timestamp, |
|
|
location: obj.location, |
|
|
severity: obj.severity, |
|
|
confirmed: obj.confirmed, |
|
|
images: obj.images, |
|
|
provenance: obj.provenance |
|
|
}; |
|
|
await pubsub.publish(FAULT_CONFIRMED, payload); |
|
|
return payload; |
|
|
}, |
|
|
|
|
|
createPayout: async (_, { input }) => { |
|
|
const id = uuidv4(); |
|
|
const q = `INSERT INTO payouts (id, fault_id, amount_minor_units, currency, payee_id, status) |
|
|
VALUES ($1,$2,$3,$4,$5,'created') RETURNING *`; |
|
|
const vals = [id, input.faultId, input.amountMinorUnits, input.currency, input.payeeId]; |
|
|
const r = await db.query(q, vals); |
|
|
const p = r.rows[0]; |
|
|
const payload = { |
|
|
id: p.id, |
|
|
faultId: p.fault_id, |
|
|
amountMinorUnits: parseInt(p.amount_minor_units, 10), |
|
|
currency: p.currency, |
|
|
payeeId: p.payee_id, |
|
|
status: p.status, |
|
|
createdAt: p.created_at, |
|
|
settledAt: p.settled_at, |
|
|
txRef: p.tx_ref |
|
|
}; |
|
|
await pubsub.publish(PAYOUT_UPDATED, payload); |
|
|
return payload; |
|
|
}, |
|
|
|
|
|
settlePayout: async (_, { payoutId }) => { |
|
|
|
|
|
const client = await db.pool.connect(); |
|
|
try { |
|
|
await client.query('BEGIN'); |
|
|
const r = await client.query('SELECT * FROM payouts WHERE id=$1 FOR UPDATE', [payoutId]); |
|
|
if (r.rowCount === 0) throw new Error('payout not found'); |
|
|
const payout = r.rows[0]; |
|
|
if (payout.status !== 'created' && payout.status !== 'processing') { |
|
|
await client.query('ROLLBACK'); |
|
|
return { |
|
|
id: payout.id, |
|
|
faultId: payout.fault_id, |
|
|
amountMinorUnits: parseInt(payout.amount_minor_units, 10), |
|
|
currency: payout.currency, |
|
|
payeeId: payout.payee_id, |
|
|
status: payout.status, |
|
|
createdAt: payout.created_at, |
|
|
settledAt: payout.settled_at, |
|
|
txRef: payout.tx_ref |
|
|
}; |
|
|
} |
|
|
await client.query('UPDATE payouts SET status=$1 WHERE id=$2', ['processing', payoutId]); |
|
|
|
|
|
const fakeTx = `TX-${Date.now()}-${Math.floor(Math.random()*1000)}`; |
|
|
await client.query('UPDATE payouts SET status=$1, tx_ref=$2, settled_at=now() WHERE id=$3', ['settled', fakeTx, payoutId]); |
|
|
await client.query('COMMIT'); |
|
|
|
|
|
const updated = (await db.query('SELECT * FROM payouts WHERE id=$1', [payoutId])).rows[0]; |
|
|
const payload = { |
|
|
id: updated.id, |
|
|
faultId: updated.fault_id, |
|
|
amountMinorUnits: parseInt(updated.amount_minor_units, 10), |
|
|
currency: updated.currency, |
|
|
payeeId: updated.payee_id, |
|
|
status: updated.status, |
|
|
createdAt: updated.created_at, |
|
|
settledAt: updated.settled_at, |
|
|
txRef: updated.tx_ref |
|
|
}; |
|
|
await pubsub.publish(PAYOUT_UPDATED, payload); |
|
|
return payload; |
|
|
} catch (err) { |
|
|
await client.query('ROLLBACK'); |
|
|
throw err; |
|
|
} finally { |
|
|
client.release(); |
|
|
} |
|
|
} |
|
|
}, |
|
|
|
|
|
Subscription: { |
|
|
faultCreated: { subscribe: () => pubsub.asyncIterator([FAULT_CREATED]) }, |
|
|
faultConfirmed: { subscribe: () => pubsub.asyncIterator([FAULT_CONFIRMED]) }, |
|
|
payoutUpdated: { subscribe: () => pubsub.asyncIterator([PAYOUT_UPDATED]) } |
|
|
} |
|
|
}; |
|
|
|
|
|
module.exports = resolvers; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
const { ApolloServer } = require('apollo-server'); |
|
|
const typeDefs = require('./schema'); |
|
|
const resolvers = require('./resolvers'); |
|
|
require('dotenv').config(); |
|
|
|
|
|
async function start() { |
|
|
const server = new ApolloServer({ typeDefs, resolvers }); |
|
|
const { url } = await server.listen({ port: process.env.PORT || 4000 }); |
|
|
console.log(`🚀 Server ready at ${url}`); |
|
|
} |
|
|
|
|
|
start(); |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|