Prisma MSSQL adapter keeps connections open after the sync finishes, preventing the process from exiting naturally. The k8s job was staying in Running state indefinitely. Call process.exit(0) on success so the job completes and the GH workflow step passes.
dalpuri
Socket.IO data service that exposes Prisma collectors over event-based RPC.
Primary use case:
- A remote system connects over Socket.IO.
- It authenticates using a pre-shared key (PSK).
- It invokes collector events (for example, fetchCompanies) with optional query options.
- It receives normalized success or error responses.
Stack
- Runtime: Bun
- Transport: Socket.IO
- Database: Microsoft SQL Server (READ-ONLY)
- ORM: Prisma with MSSQL adapter
⚠️ Important: Database is READ-ONLY
All database operations are SELECT queries only. No INSERT, UPDATE, DELETE, or other write operations are allowed. This service provides read-only access to ConnectWise Manage data.
Schema Corrections Applied
All product-related tables have been corrected to map to the actual database schema:
| Prisma Model | Corrected Table | Previous (Incorrect) |
|---|---|---|
| ProductCategory | IV_Category |
|
| ProductSubcategory | IV_SubCategory |
|
| ProductManufacturer | Manufacturer |
|
| ProductCatalog | IV_Item |
|
| ProductInventory | Inventory_By_Warehouse |
|
| ItemVendor | iv_item_Vendor |
Key Fixes:
- ✅ All
@@mapdirectives point to actual database tables - ✅ All field names match database columns exactly
- ✅ Primary keys use correct RecID naming (e.g.,
ivItemRecIdnotcatalogRecId) - ✅ Removed incorrect
ProductVendormodel (table doesn't exist) - ✅ Added proper
ItemVendorjunction table model - ✅ Fixed TypeScript types (
@db.UniqueIdentifiercapitalization) - ✅ Corrected all relationship hierarchies
Database Relations Reference
Product Models
ProductCategory (IV_Category)
Primary Key: ivCatRecId
Available Relations:
subcategories→ ProductSubcategory[] - All subcategories in this category
Example Include:
{
include: ["subcategories"];
}
ProductSubcategory (IV_SubCategory)
Primary Key: ivSubcatRecId
Foreign Keys: ivCatRecId → ProductCategory
Available Relations:
category→ ProductCategory - Parent categoryproducts→ ProductCatalog[] - All products in this subcategory
Example Include:
{
include: ["category", "products"];
}
ProductManufacturer (Manufacturer)
Primary Key: manufacturerRecId
Available Relations:
products→ ProductCatalog[] - All products by this manufacturer
Example Include:
{
include: ["products"];
}
ProductCatalog (IV_Item)
Primary Key: ivItemRecId
Foreign Keys: ivSubcatRecId → ProductSubcategory, manufacturerRecId → ProductManufacturer
Available Relations:
subcategory→ ProductSubcategory - Category info (can nestcategoryfor full hierarchy)manufacturer→ ProductManufacturer - Manufacturer infoinventory→ ProductInventory[] - All inventory records across warehousesitemVendors→ ItemVendor[] - All vendor relationships
Example Include:
{
include: ["subcategory", "manufacturer", "inventory", "itemVendors"];
}
Nested Include for Full Category Hierarchy:
{
"include": {
"subcategory": {
"include": {
"category": true
}
}
}
}
ProductInventory (Inventory_By_Warehouse)
Primary Key: inventoryByWarehouseRecId
Foreign Keys: ivItemRecId → ProductCatalog
Available Relations:
productCatalog→ ProductCatalog - Parent product
Example Include:
{
include: ["productCatalog"];
}
ItemVendor (iv_item_Vendor)
Primary Key: ivItemVendorId
Foreign Keys: ivItemRecId → ProductCatalog
Available Relations:
product→ ProductCatalog - Product this vendor supplies
Example Include:
{
include: ["product"];
}
Company Models
Company
Primary Key: companyRecId
Available Relations:
timeZone→ TimeZonecompanyStatus→ CompanyStatustaxCode→ TaxCodecurrency→ CurrencyownerLevel→ OwnerLevelmarket→ MarketownershipType→ OwnershipTypebillingTerms→ BillingTermsbillingDelivery→ BillingDeliverysrSignoff→ SrSignoffivPriceHeader→ IvPriceHeaderofficeCalendar→ OfficeCalendarcustomerUsageType→ CustomerUsageTypeblInvtemplateSetup→ BlInvTemplateSetupemailTemplate→ EmailTemplateparentCompany→ Company - Parent company (self-relation)childCompanies→ Company[] - Child companiesopportunityCompanies→ SoOpportunity[] - Opportunities where this is the primary companyopportunityShipToCompanies→ SoOpportunity[] - Opportunities shipping to this companyopportunityBillToCompanies→ SoOpportunity[] - Opportunities billing to this company
Example Include:
{
include: [
"companyStatus",
"ownerLevel",
"opportunityCompanies",
"parentCompany",
"childCompanies",
];
}
Opportunity Models
SoOpportunity (SO_Opportunity)
Primary Key: opportunityRecId
Available Relations:
company→ Company - Primary companycontact→ Contact - Primary contactsoPipeline→ SoPipeline - Pipeline stagesoInterest→ SoInterest - Interest levelbillingUnit→ BillingUnit - Billing unitcontractType→ ContractType - Contract typecompanyAddress→ CompanyAddress - Company addresssoOppStatus→ SoOppStatus - Opportunity statuspmProject→ PmProject - Related projectownerLevel→ OwnerLevel - Owner levelsoType→ SoType - Opportunity typemarketingCampaign→ MarketingCampaign - Marketing campaignagrType→ AgrType - Agreement typesrService→ SrService - Service typesoUrgency→ SoUrgency - Urgency levelapprovedByMember→ Member - Member who approvedrejectedByMember→ Member - Member who rejectedshipToCompany→ Company - Ship to companyshipToContact→ Contact - Ship to contactshipToCompanyAddress→ CompanyAddress - Ship to addressbillToCompany→ Company - Bill to companybillToContact→ Contact - Bill to contactbillToCompanyAddress→ CompanyAddress - Bill to addressbillingTerms→ BillingTerms - Billing termstaxCode→ TaxCode - Tax codecurrency→ Currency - CurrencytechContact→ Contact - Technical contact
Example Include:
{
include: [
"company",
"contact",
"soPipeline",
"soOppStatus",
"ownerLevel",
"approvedByMember",
];
}
Member Models
Member
Primary Key: memberRecId
Available Relations:
ownerLevel→ OwnerLevel - Member owner levelbillingUnit→ BillingUnit - Billing unitactivityClass→ ActivityClass - Activity classmemberType→ MemberType - Member typesrOwnerLevel→ OwnerLevel - Service request owner levelsrBillingUnit→ BillingUnit - Service request billing unitschOwnerLevel→ OwnerLevel - Schedule owner levelschBillingUnit→ BillingUnit - Schedule billing unitsrBoard→ SrBoard - Service boardtimeZone→ TimeZone - Time zonebrHeader→ BrHeader - BR headeractivityType→ ActivityType - Activity typepmOwnerLevel→ OwnerLevel - Project management owner levelpmBillingUnit→ BillingUnit - Project management billing unitwarehouse→ Warehouse - WarehousewarehouseBin→ WarehouseBin - Warehouse bincountry→ Country - CountrydirectionalSync→ DirectionalSync - Sync directionapprovedOpportunities→ SoOpportunity[] - Opportunities approved by this memberrejectedOpportunities→ SoOpportunity[] - Opportunities rejected by this member
Example Include:
{
include: [
"ownerLevel",
"billingUnit",
"memberType",
"timeZone",
"approvedOpportunities",
];
}
Contact Models
Contact
Primary Key: contactRecId
Available Relations:
opportunityContacts→ SoOpportunity[] - Opportunities where this is primary contactopportunityShipToContacts→ SoOpportunity[] - Opportunities shipping to this contactopportunityBillToContacts→ SoOpportunity[] - Opportunities billing to this contactopportunityTechContacts→ SoOpportunity[] - Opportunities where this is technical contact
Example Include:
{
include: ["opportunityContacts"];
}
Relationship Hierarchy Diagrams
Product Hierarchy
ProductCategory (IV_Category)
↓ one-to-many
ProductSubcategory (IV_SubCategory)
↓ one-to-many
ProductCatalog (IV_Item) ←──── ProductManufacturer (Manufacturer)
↓ one-to-many many-to-one ↗
├─→ ProductInventory (Inventory_By_Warehouse)
└─→ ItemVendor (iv_item_Vendor)
Company/Opportunity Hierarchy
Company
├─→ opportunityCompanies (SoOpportunity[])
├─→ parentCompany (Company)
└─→ childCompanies (Company[])
SoOpportunity
├─→ company (Company)
├─→ contact (Contact)
├─→ soPipeline (SoPipeline)
├─→ soOppStatus (SoOppStatus)
├─→ ownerLevel (OwnerLevel)
└─→ approvedByMember (Member)
Environment Variables
Required variables:
- DATABASE_URL: SQL Server connection string for Prisma
- PSK: pre-shared key required for Socket.IO handshake
- PORT: optional server port (defaults to 3000)
Example shape:
DATABASE_URL="sqlserver://host:1433;database=your_db;user=your_user;password=your_password;encrypt=true;trustServerCertificate=true"
PSK="replace-with-strong-shared-key"
PORT="3000"
Install And Run
Install dependencies:
bun install
Run in watch mode:
bun run dev
Generate Prisma client:
bun run db:generate
View database in Prisma Studio:
bun run db:studio
Type-check:
bunx tsc --noEmit
Runtime Flow
- Server starts from src/index.ts and calls startServer.
- Socket.IO server is created.
- Handshake authentication validates PSK.
- On each new socket connection, server scans src/collectors.
- For each .ts collector file, server registers an event named after the file name.
- Incoming event payload is passed to the collector as opts.
- Collector result is returned via callback in a standard envelope.
Authentication Contract
Connection is rejected unless provided PSK matches server PSK.
Accepted handshake sources (first non-empty wins):
- auth.psk
- header x-psk
- query psk
Recommended client method:
- send psk in auth.psk
If auth fails:
- socket connection is denied with Unauthorized
Collector Event Contract
Event Name
Event name is the collector file name without .ts.
Current collectors:
- fetchCompanies (from src/collectors/fetchCompanies.ts)
- fetchMembers (from src/collectors/fetchMembers.ts)
- fetchProducts (from src/collectors/fetchProducts.ts)
Event Signature
Client emits:
socket.emit(eventName, opts, callback);
Where:
- eventName: string
- opts: optional object
- callback: function receiving envelope
opts Shape
Collectors accept optional opts:
{
select?: string[];
include?: string[];
}
Rules:
- Omit opts entirely to fetch full records.
- Use select to request specific fields.
- Use include to request relations.
- Do not provide both select and include in the same call.
If both select and include are provided:
- collector throws
- callback returns success false with error message
Response Envelope
Success:
{
"success": true,
"data": [ ... ]
}
Failure:
{
"success": false,
"error": "error message"
}
Concrete Usage Examples
1) Fetch all companies
socket.emit("fetchCompanies", undefined, (res) => {
if (!res.success) return console.error(res.error);
console.log(res.data);
});
2) Fetch selected company fields
socket.emit(
"fetchCompanies",
{ select: ["companyId", "companyName"] },
(res) => {
if (!res.success) return console.error(res.error);
console.log(res.data);
},
);
3) Fetch companies with opportunities relation
socket.emit("fetchCompanies", { include: ["opportunityCompanies"] }, (res) => {
if (!res.success) return console.error(res.error);
console.log(res.data);
});
4) Fetch selected member fields
socket.emit(
"fetchMembers",
{ select: ["memberId", "firstName", "lastName"] },
(res) => {
if (!res.success) return console.error(res.error);
console.log(res.data);
},
);
5) Fetch all products
socket.emit("fetchProducts", undefined, (res) => {
if (!res.success) return console.error(res.error);
console.log(res.data);
});
6) Fetch selected product fields
socket.emit(
"fetchProducts",
{
select: [
"ivItemRecId",
"itemId",
"description",
"listPrice",
"currentCost",
],
},
(res) => {
if (!res.success) return console.error(res.error);
console.log(res.data);
},
);
7) Fetch products with subcategory, manufacturer, and inventory
socket.emit(
"fetchProducts",
{ include: ["subcategory", "manufacturer", "inventory", "itemVendors"] },
(res) => {
if (!res.success) return console.error(res.error);
console.log(res.data);
},
);
8) Fetch products with nested category hierarchy
// To get the full category hierarchy, you can nest includes in the collector
// This would require updating the collector to support nested includes
socket.emit(
"fetchProducts",
{
include: {
subcategory: {
include: {
category: true,
},
},
manufacturer: true,
inventory: true,
},
},
(res) => {
if (!res.success) return console.error(res.error);
console.log(res.data);
},
);
Reference Client (Node)
This is the easiest template to hand to another system:
import { io } from "socket.io-client";
const socket = io("http://localhost:3000", {
auth: { psk: process.env.PSK },
transports: ["websocket"],
});
socket.on("connect", () => {
console.log("connected", socket.id);
socket.emit("fetchCompanies", { select: ["companyId"] }, (res: any) => {
if (!res.success) {
console.error("collector error", res.error);
return;
}
console.log("rows", res.data.length);
});
});
socket.on("connect_error", (err) => {
console.error("connect_error", err.message);
});
Collector Field Reference
fetchCompanies
Select fields: companyRecId, companyId, companyName, phoneNbr, websiteUrl, accountNbr, etc. Include relations: timeZone, companyStatus, taxCode, currency, ownerLevel, market, billingTerms, billingDelivery, opportunityCompanies, etc.
fetchMembers
Select fields: memberRecId, memberId, firstName, lastName, emailAddress, inactiveFlag, title, etc. Include relations: ownerLevel, billingUnit, memberType, timeZone, directionalSync, etc.
fetchProducts
Select fields: ivItemRecId, itemId, description, longDescription, ivSubcatRecId, manufacturerRecId, listPrice, currentCost, taxableFlag, inactiveFlag, mfgItemId, vendorSku, minimumStock, recurringFlag, lastUpdate, lastUpdateUtc, etc.
Include relations: subcategory, manufacturer, inventory, itemVendors
Note: The product model has been corrected to map to IV_Item table. Use the corrected field names as shown above.
How To Add A New Collector
- Create a new file in src/collectors, for example fetchOpportunities.ts.
- Export a default async function with optional opts argument.
- Use the helper in src/helper/collectorQuery.ts to process select/include.
- Restart server if needed.
Template:
import { prisma } from "../constants";
import { Prisma } from "../../generated/prisma/client";
import {
buildCollectorFindManyArgs,
CollectorQueryOptions,
} from "../helper/collectorQuery";
type OpportunityCollectorOpts = CollectorQueryOptions<
Prisma.SoOpportunitySelect,
Prisma.SoOpportunityInclude
>;
export default async (opts?: OpportunityCollectorOpts) => {
const args: Prisma.SoOpportunityFindManyArgs = buildCollectorFindManyArgs<
Prisma.SoOpportunitySelect,
Prisma.SoOpportunityInclude
>(opts);
return prisma.soOpportunity.findMany(args);
};
Operational Notes
- The server currently trusts all CORS origins at the Socket.IO layer.
- Collector discovery happens at connection time by reading src/collectors.
- Keep collector file names stable because they define public event names.
- Database is READ-ONLY: Only SELECT queries are allowed.
Generated Data Files
The repository may contain generated JSON files (e.g., products-with-relations.json) that export sample data with all relationships included. These are useful for:
- Understanding the data structure
- Testing client applications
- Offline development
To regenerate product data:
bun run export:products
Field Name Reference (Product Models)
Old (Incorrect) vs New (Correct) Field Names
| Old Field Name | New Field Name | Description |
|---|---|---|
catalogRecId |
ivItemRecId |
Primary key |
cwCatalogId |
N/A (removed) | No longer exists |
identifier |
itemId |
Item identifier |
productName |
description |
Product name |
price |
listPrice |
List price |
cost |
currentCost |
Current cost |
categoryRecId |
ivSubcatRecId |
Subcategory FK |
subcategoryRecId |
N/A (use ivSubcatRecId) | Changed |
manufacturerPartNum |
mfgItemId |
Manufacturer part |
salesTaxableFlag |
taxableFlag |
Tax flag |
cwLastUpdated |
lastUpdate |
Last update |
Product Relations Name Changes
| Old Relation | New Relation | Type |
|---|---|---|
category |
subcategory |
Changed (now points to subcategory) |
vendor |
itemVendors |
Changed (now junction table) |
| N/A | manufacturer |
Added |
inventory |
inventory |
Same (but different table) |
Important: When updating collectors or client code, use the new field names and relations shown above.
Troubleshooting
Prisma Client Errors
If you see Prisma client errors, regenerate the client:
bun run db:generate
Connection Errors
- Verify
DATABASE_URLin.env - Check SQL Server connectivity
- Ensure MSSQL adapter is installed (
@prisma/adapter-mssql)
Collector Errors
- Check that relation names are correct (use the reference above)
- Use
selectORinclude, not both - Verify field names match the schema