dalpuri
Socket.IO data service that exposes Prisma collectors over event-based RPC.
Primary use case:
- A remote system connects over Socket.IO.
- It authenticates using a pre-shared key (PSK).
- It invokes collector events (for example, fetchCompanies) with optional query options.
- It receives normalized success or error responses.
Stack
- Runtime: Bun
- Transport: Socket.IO
- Database: Microsoft SQL Server (READ-ONLY)
- ORM: Prisma with MSSQL adapter
⚠️ Important: Database is READ-ONLY
All database operations are SELECT queries only. No INSERT, UPDATE, DELETE, or other write operations are allowed. This service provides read-only access to ConnectWise Manage data.
Schema Corrections Applied
All product-related tables have been corrected to map to the actual database schema:
| Prisma Model | Corrected Table | Previous (Incorrect) |
|---|---|---|
| ProductCategory | IV_Category |
|
| ProductSubcategory | IV_SubCategory |
|
| ProductManufacturer | Manufacturer |
|
| ProductCatalog | IV_Item |
|
| ProductInventory | Inventory_By_Warehouse |
|
| ItemVendor | iv_item_Vendor |
Key Fixes:
- ✅ All
@@mapdirectives point to actual database tables - ✅ All field names match database columns exactly
- ✅ Primary keys use correct RecID naming (e.g.,
ivItemRecIdnotcatalogRecId) - ✅ Removed incorrect
ProductVendormodel (table doesn't exist) - ✅ Added proper
ItemVendorjunction table model - ✅ Fixed TypeScript types (
@db.UniqueIdentifiercapitalization) - ✅ Corrected all relationship hierarchies
Database Relations Reference
Product Models
ProductCategory (IV_Category)
Primary Key: ivCatRecId
Available Relations:
subcategories→ ProductSubcategory[] - All subcategories in this category
Example Include:
{
include: ["subcategories"];
}
ProductSubcategory (IV_SubCategory)
Primary Key: ivSubcatRecId
Foreign Keys: ivCatRecId → ProductCategory
Available Relations:
category→ ProductCategory - Parent categoryproducts→ ProductCatalog[] - All products in this subcategory
Example Include:
{
include: ["category", "products"];
}
ProductManufacturer (Manufacturer)
Primary Key: manufacturerRecId
Available Relations:
products→ ProductCatalog[] - All products by this manufacturer
Example Include:
{
include: ["products"];
}
ProductCatalog (IV_Item)
Primary Key: ivItemRecId
Foreign Keys: ivSubcatRecId → ProductSubcategory, manufacturerRecId → ProductManufacturer
Available Relations:
subcategory→ ProductSubcategory - Category info (can nestcategoryfor full hierarchy)manufacturer→ ProductManufacturer - Manufacturer infoinventory→ ProductInventory[] - All inventory records across warehousesitemVendors→ ItemVendor[] - All vendor relationships
Example Include:
{
include: ["subcategory", "manufacturer", "inventory", "itemVendors"];
}
Nested Include for Full Category Hierarchy:
{
"include": {
"subcategory": {
"include": {
"category": true
}
}
}
}
ProductInventory (Inventory_By_Warehouse)
Primary Key: inventoryByWarehouseRecId
Foreign Keys: ivItemRecId → ProductCatalog
Available Relations:
productCatalog→ ProductCatalog - Parent product
Example Include:
{
include: ["productCatalog"];
}
ItemVendor (iv_item_Vendor)
Primary Key: ivItemVendorId
Foreign Keys: ivItemRecId → ProductCatalog
Available Relations:
product→ ProductCatalog - Product this vendor supplies
Example Include:
{
include: ["product"];
}
Company Models
Company
Primary Key: companyRecId
Available Relations:
timeZone→ TimeZonecompanyStatus→ CompanyStatustaxCode→ TaxCodecurrency→ CurrencyownerLevel→ OwnerLevelmarket→ MarketownershipType→ OwnershipTypebillingTerms→ BillingTermsbillingDelivery→ BillingDeliverysrSignoff→ SrSignoffivPriceHeader→ IvPriceHeaderofficeCalendar→ OfficeCalendarcustomerUsageType→ CustomerUsageTypeblInvtemplateSetup→ BlInvTemplateSetupemailTemplate→ EmailTemplateparentCompany→ Company - Parent company (self-relation)childCompanies→ Company[] - Child companiesopportunityCompanies→ SoOpportunity[] - Opportunities where this is the primary companyopportunityShipToCompanies→ SoOpportunity[] - Opportunities shipping to this companyopportunityBillToCompanies→ SoOpportunity[] - Opportunities billing to this company
Example Include:
{
include: [
"companyStatus",
"ownerLevel",
"opportunityCompanies",
"parentCompany",
"childCompanies",
];
}
Opportunity Models
SoOpportunity (SO_Opportunity)
Primary Key: opportunityRecId
Available Relations:
company→ Company - Primary companycontact→ Contact - Primary contactsoPipeline→ SoPipeline - Pipeline stagesoInterest→ SoInterest - Interest levelbillingUnit→ BillingUnit - Billing unitcontractType→ ContractType - Contract typecompanyAddress→ CompanyAddress - Company addresssoOppStatus→ SoOppStatus - Opportunity statuspmProject→ PmProject - Related projectownerLevel→ OwnerLevel - Owner levelsoType→ SoType - Opportunity typemarketingCampaign→ MarketingCampaign - Marketing campaignagrType→ AgrType - Agreement typesrService→ SrService - Service typesoUrgency→ SoUrgency - Urgency levelapprovedByMember→ Member - Member who approvedrejectedByMember→ Member - Member who rejectedshipToCompany→ Company - Ship to companyshipToContact→ Contact - Ship to contactshipToCompanyAddress→ CompanyAddress - Ship to addressbillToCompany→ Company - Bill to companybillToContact→ Contact - Bill to contactbillToCompanyAddress→ CompanyAddress - Bill to addressbillingTerms→ BillingTerms - Billing termstaxCode→ TaxCode - Tax codecurrency→ Currency - CurrencytechContact→ Contact - Technical contact
Example Include:
{
include: [
"company",
"contact",
"soPipeline",
"soOppStatus",
"ownerLevel",
"approvedByMember",
];
}
Member Models
Member
Primary Key: memberRecId
Available Relations:
ownerLevel→ OwnerLevel - Member owner levelbillingUnit→ BillingUnit - Billing unitactivityClass→ ActivityClass - Activity classmemberType→ MemberType - Member typesrOwnerLevel→ OwnerLevel - Service request owner levelsrBillingUnit→ BillingUnit - Service request billing unitschOwnerLevel→ OwnerLevel - Schedule owner levelschBillingUnit→ BillingUnit - Schedule billing unitsrBoard→ SrBoard - Service boardtimeZone→ TimeZone - Time zonebrHeader→ BrHeader - BR headeractivityType→ ActivityType - Activity typepmOwnerLevel→ OwnerLevel - Project management owner levelpmBillingUnit→ BillingUnit - Project management billing unitwarehouse→ Warehouse - WarehousewarehouseBin→ WarehouseBin - Warehouse bincountry→ Country - CountrydirectionalSync→ DirectionalSync - Sync directionapprovedOpportunities→ SoOpportunity[] - Opportunities approved by this memberrejectedOpportunities→ SoOpportunity[] - Opportunities rejected by this member
Example Include:
{
include: [
"ownerLevel",
"billingUnit",
"memberType",
"timeZone",
"approvedOpportunities",
];
}
Contact Models
Contact
Primary Key: contactRecId
Available Relations:
opportunityContacts→ SoOpportunity[] - Opportunities where this is primary contactopportunityShipToContacts→ SoOpportunity[] - Opportunities shipping to this contactopportunityBillToContacts→ SoOpportunity[] - Opportunities billing to this contactopportunityTechContacts→ SoOpportunity[] - Opportunities where this is technical contact
Example Include:
{
include: ["opportunityContacts"];
}
Relationship Hierarchy Diagrams
Product Hierarchy
ProductCategory (IV_Category)
↓ one-to-many
ProductSubcategory (IV_SubCategory)
↓ one-to-many
ProductCatalog (IV_Item) ←──── ProductManufacturer (Manufacturer)
↓ one-to-many many-to-one ↗
├─→ ProductInventory (Inventory_By_Warehouse)
└─→ ItemVendor (iv_item_Vendor)
Company/Opportunity Hierarchy
Company
├─→ opportunityCompanies (SoOpportunity[])
├─→ parentCompany (Company)
└─→ childCompanies (Company[])
SoOpportunity
├─→ company (Company)
├─→ contact (Contact)
├─→ soPipeline (SoPipeline)
├─→ soOppStatus (SoOppStatus)
├─→ ownerLevel (OwnerLevel)
└─→ approvedByMember (Member)
Environment Variables
Required variables:
- DATABASE_URL: SQL Server connection string for Prisma
- PSK: pre-shared key required for Socket.IO handshake
- PORT: optional server port (defaults to 3000)
Example shape:
DATABASE_URL="sqlserver://host:1433;database=your_db;user=your_user;password=your_password;encrypt=true;trustServerCertificate=true"
PSK="replace-with-strong-shared-key"
PORT="3000"
Install And Run
Install dependencies:
bun install
Run in watch mode:
bun run dev
Generate Prisma client:
bun run db:generate
View database in Prisma Studio:
bun run db:studio
Type-check:
bunx tsc --noEmit
Runtime Flow
- Server starts from src/index.ts and calls startServer.
- Socket.IO server is created.
- Handshake authentication validates PSK.
- On each new socket connection, server scans src/collectors.
- For each .ts collector file, server registers an event named after the file name.
- Incoming event payload is passed to the collector as opts.
- Collector result is returned via callback in a standard envelope.
Authentication Contract
Connection is rejected unless provided PSK matches server PSK.
Accepted handshake sources (first non-empty wins):
- auth.psk
- header x-psk
- query psk
Recommended client method:
- send psk in auth.psk
If auth fails:
- socket connection is denied with Unauthorized
Collector Event Contract
Event Name
Event name is the collector file name without .ts.
Current collectors:
- fetchCompanies (from src/collectors/fetchCompanies.ts)
- fetchMembers (from src/collectors/fetchMembers.ts)
- fetchProducts (from src/collectors/fetchProducts.ts)
Event Signature
Client emits:
socket.emit(eventName, opts, callback);
Where:
- eventName: string
- opts: optional object
- callback: function receiving envelope
opts Shape
Collectors accept optional opts:
{
select?: string[];
include?: string[];
}
Rules:
- Omit opts entirely to fetch full records.
- Use select to request specific fields.
- Use include to request relations.
- Do not provide both select and include in the same call.
If both select and include are provided:
- collector throws
- callback returns success false with error message
Response Envelope
Success:
{
"success": true,
"data": [ ... ]
}
Failure:
{
"success": false,
"error": "error message"
}
Concrete Usage Examples
1) Fetch all companies
socket.emit("fetchCompanies", undefined, (res) => {
if (!res.success) return console.error(res.error);
console.log(res.data);
});
2) Fetch selected company fields
socket.emit(
"fetchCompanies",
{ select: ["companyId", "companyName"] },
(res) => {
if (!res.success) return console.error(res.error);
console.log(res.data);
},
);
3) Fetch companies with opportunities relation
socket.emit("fetchCompanies", { include: ["opportunityCompanies"] }, (res) => {
if (!res.success) return console.error(res.error);
console.log(res.data);
});
4) Fetch selected member fields
socket.emit(
"fetchMembers",
{ select: ["memberId", "firstName", "lastName"] },
(res) => {
if (!res.success) return console.error(res.error);
console.log(res.data);
},
);
5) Fetch all products
socket.emit("fetchProducts", undefined, (res) => {
if (!res.success) return console.error(res.error);
console.log(res.data);
});
6) Fetch selected product fields
socket.emit(
"fetchProducts",
{
select: [
"ivItemRecId",
"itemId",
"description",
"listPrice",
"currentCost",
],
},
(res) => {
if (!res.success) return console.error(res.error);
console.log(res.data);
},
);
7) Fetch products with subcategory, manufacturer, and inventory
socket.emit(
"fetchProducts",
{ include: ["subcategory", "manufacturer", "inventory", "itemVendors"] },
(res) => {
if (!res.success) return console.error(res.error);
console.log(res.data);
},
);
8) Fetch products with nested category hierarchy
// To get the full category hierarchy, you can nest includes in the collector
// This would require updating the collector to support nested includes
socket.emit(
"fetchProducts",
{
include: {
subcategory: {
include: {
category: true,
},
},
manufacturer: true,
inventory: true,
},
},
(res) => {
if (!res.success) return console.error(res.error);
console.log(res.data);
},
);
Reference Client (Node)
This is the easiest template to hand to another system:
import { io } from "socket.io-client";
const socket = io("http://localhost:3000", {
auth: { psk: process.env.PSK },
transports: ["websocket"],
});
socket.on("connect", () => {
console.log("connected", socket.id);
socket.emit("fetchCompanies", { select: ["companyId"] }, (res: any) => {
if (!res.success) {
console.error("collector error", res.error);
return;
}
console.log("rows", res.data.length);
});
});
socket.on("connect_error", (err) => {
console.error("connect_error", err.message);
});
Collector Field Reference
fetchCompanies
Select fields: companyRecId, companyId, companyName, phoneNbr, websiteUrl, accountNbr, etc. Include relations: timeZone, companyStatus, taxCode, currency, ownerLevel, market, billingTerms, billingDelivery, opportunityCompanies, etc.
fetchMembers
Select fields: memberRecId, memberId, firstName, lastName, emailAddress, inactiveFlag, title, etc. Include relations: ownerLevel, billingUnit, memberType, timeZone, directionalSync, etc.
fetchProducts
Select fields: ivItemRecId, itemId, description, longDescription, ivSubcatRecId, manufacturerRecId, listPrice, currentCost, taxableFlag, inactiveFlag, mfgItemId, vendorSku, minimumStock, recurringFlag, lastUpdate, lastUpdateUtc, etc.
Include relations: subcategory, manufacturer, inventory, itemVendors
Note: The product model has been corrected to map to IV_Item table. Use the corrected field names as shown above.
How To Add A New Collector
- Create a new file in src/collectors, for example fetchOpportunities.ts.
- Export a default async function with optional opts argument.
- Use the helper in src/helper/collectorQuery.ts to process select/include.
- Restart server if needed.
Template:
import { prisma } from "../constants";
import { Prisma } from "../../generated/prisma/client";
import {
buildCollectorFindManyArgs,
CollectorQueryOptions,
} from "../helper/collectorQuery";
type OpportunityCollectorOpts = CollectorQueryOptions<
Prisma.SoOpportunitySelect,
Prisma.SoOpportunityInclude
>;
export default async (opts?: OpportunityCollectorOpts) => {
const args: Prisma.SoOpportunityFindManyArgs = buildCollectorFindManyArgs<
Prisma.SoOpportunitySelect,
Prisma.SoOpportunityInclude
>(opts);
return prisma.soOpportunity.findMany(args);
};
Operational Notes
- The server currently trusts all CORS origins at the Socket.IO layer.
- Collector discovery happens at connection time by reading src/collectors.
- Keep collector file names stable because they define public event names.
- Database is READ-ONLY: Only SELECT queries are allowed.
Generated Data Files
The repository may contain generated JSON files (e.g., products-with-relations.json) that export sample data with all relationships included. These are useful for:
- Understanding the data structure
- Testing client applications
- Offline development
To regenerate product data:
bun run export:products
Field Name Reference (Product Models)
Old (Incorrect) vs New (Correct) Field Names
| Old Field Name | New Field Name | Description |
|---|---|---|
catalogRecId |
ivItemRecId |
Primary key |
cwCatalogId |
N/A (removed) | No longer exists |
identifier |
itemId |
Item identifier |
productName |
description |
Product name |
price |
listPrice |
List price |
cost |
currentCost |
Current cost |
categoryRecId |
ivSubcatRecId |
Subcategory FK |
subcategoryRecId |
N/A (use ivSubcatRecId) | Changed |
manufacturerPartNum |
mfgItemId |
Manufacturer part |
salesTaxableFlag |
taxableFlag |
Tax flag |
cwLastUpdated |
lastUpdate |
Last update |
Product Relations Name Changes
| Old Relation | New Relation | Type |
|---|---|---|
category |
subcategory |
Changed (now points to subcategory) |
vendor |
itemVendors |
Changed (now junction table) |
| N/A | manufacturer |
Added |
inventory |
inventory |
Same (but different table) |
Important: When updating collectors or client code, use the new field names and relations shown above.
Troubleshooting
Prisma Client Errors
If you see Prisma client errors, regenerate the client:
bun run db:generate
Connection Errors
- Verify
DATABASE_URLin.env - Check SQL Server connectivity
- Ensure MSSQL adapter is installed (
@prisma/adapter-mssql)
Collector Errors
- Check that relation names are correct (use the reference above)
- Use
selectORinclude, not both - Verify field names match the schema