Signald instead of signal-cli-rest-api made the system so much faster!

This commit is contained in:
Matěj Divecký 2020-09-28 18:17:07 +02:00
parent a7937ade6c
commit 6f3f18e75a
12 changed files with 349 additions and 74 deletions

4
.gitignore vendored
View file

@ -1,2 +1,4 @@
signal-cli-rest-api/signal-cli-config
signald/gradle/
signald/run/
registrations.json
*__pycache__

View file

@ -3,21 +3,19 @@
Set of docker containers for asynchronous Signal message processing (sending & receiving).
It can be used as a gateway for automated notifications, bot with automatic replies or Signal equivalent of Telegram public groups where individuals can't see other users in group.
## Signal-cli-rest-api
Slightly edited version of https://github.com/bbernhard/signal-cli-rest-api
Manages installation and running of signal-cli, registrations of new Signal accounts sending and receiving of Signal messages over REST API.
In future, it will probably be remade in Python to allow for receiving and sending attachments.
In future, it should use DBus for messages, because it is a LOT faster than calling signal-cli all the time.
## Signald
https://gitlab.com/thefinn93/signald
daemon that facilitates communication over Signal.
Using fixed pysignald https://gitlab.com/stavros/pysignald as python library. Latest version of pysignald is not compatible with changes in Signal protocol.
## registrations.json
Simple json file that receiver and sender scripts use as source of registered Signal accounts. At this time registrations need to be added manually to this file.
It's also possible to run signal-cli-rest-api on different server, or in multiple instances by using the "url" field-
## Receiver
Periodicaly asks signal-cli-rest-api for new messages on Signal servers for each registered number. If there is new message, it writes it to RabbitMQ (queue signal-receive) for processing.
Creates threads for each registered number (in registrations.json) and listens for new messages. If there is new message, it writes it to RabbitMQ (queue signal-receive) for processing.
## Sender
Listens for new messages in RabbitMQ (queue signal-send) and sends them to signal-cli-rest-api server accroding to registrations.json
Listens for new messages in RabbitMQ (queue signal-send) and sends them to signald.
## RabbitMQ
Stores received messages and messages that are supposed to be sent out.
@ -33,12 +31,36 @@ Right now, worker just takes received message and sends it back to the sender.
In future, there should be API for receiving messages from other apps and respond to automatic messages.
---
## New number registrations
For adding new number and testing it out, use this. (ofc not in one run...)
```
from signald import Signal #or import pysignald from /libs
s = Signal("+1234567890")
# If you haven't registered/verified signald, do that first:
s.register(voice=False)
s.verify("sms code")
s.send_message("+1098765432", "Hello there!")
for message in s.receive_messages():
print(message)
```
## Working scheme
```
+-------------------------+
| |
+--------------+ signal-cli-rest-api <------------------+
+--------------+ signald <------------------+
| | | |
| +-------------------------+ |
| |

View file

@ -1,10 +1,18 @@
version: "3"
services:
signal-api:
build: "./signal-cli-rest-api/."
# signal-api:
# build: "./signal-cli-rest-api/."
# volumes:
# - "./signal-cli-rest-api/signal-cli-config:/home/.local/share/signal-cli" #map "signal-cli-config" folder on host system into docker container. the folder contains the password and cryptographic keys when a new number is registered
# restart: unless-stopped
signald:
image: finn/signald:latest
volumes:
- "./signal-cli-rest-api/signal-cli-config:/home/.local/share/signal-cli" #map "signal-cli-config" folder on host system into docker container. the folder contains the password and cryptographic keys when a new number is registered
- './signald/gradle:/home/gradle'
- './signald/run:/var/run/signald'
restart: unless-stopped
rabbitmq:
@ -15,31 +23,36 @@ services:
build: "./receiver/."
depends_on:
- rabbitmq
- signal-api
- signald
environment:
RABBITMQ_HOST: rabbitmq
SOCKET_PATH: ./run/signald.sock
restart: on-failure
volumes:
- "./registrations.json:/usr/src/app/registrations.json"
- "./signald/run:/usr/src/app/run"
- "./libs/pysignald:/usr/src/app/pysignald"
sender:
build: "./sender/."
depends_on:
- rabbitmq
- signal-api
- signald
environment:
RABBITMQ_HOST: rabbitmq
restart: on-failure
SOCKET_PATH: ./run/signald.sock
restart: unless-stopped
volumes:
- "./registrations.json:/usr/src/app/registrations.json"
- "./signald/run:/usr/src/app/run"
- "./libs/pysignald:/usr/src/app/pysignald"
worker:
build: "./worker/."
depends_on:
- rabbitmq
- signal-api
- sender
- receiver
environment:
RABBITMQ_HOST: rabbitmq
restart: on-failure
restart: unless-stopped

View file

@ -0,0 +1,4 @@
# flake8: noqa
__version__ = "0.0.8"
from .main import Signal

228
libs/pysignald/main.py Normal file
View file

@ -0,0 +1,228 @@
import json
import random
import re
import socket
from typing import Iterator, List # noqa
from .pysignald_types import Attachment, Message
# We'll need to know the compiled RE object later.
RE_TYPE = type(re.compile(""))
def readlines(s: socket.socket) -> Iterator[bytes]:
"Read a socket, line by line."
buf = [] # type: List[bytes]
while True:
char = s.recv(1)
if not char:
raise ConnectionResetError("connection was reset")
if char == b"\n":
yield b"".join(buf)
buf = []
else:
buf.append(char)
class Signal:
def __init__(self, username, socket_path="/var/run/signald/signald.sock"):
self.username = username
self.socket_path = socket_path
self._chat_handlers = []
def _get_id(self):
"Generate a random ID."
return "".join(random.choice("abcdefghijklmnopqrstuvwxyz0123456789") for _ in range(10))
def _get_socket(self) -> socket.socket:
"Create a socket, connect to the server and return it."
# Support TCP sockets on the sly.
if isinstance(self.socket_path, tuple):
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
else:
s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
s.connect(self.socket_path)
return s
def _send_command(self, payload: dict, block: bool = False):
s = self._get_socket()
msg_id = self._get_id()
payload["id"] = msg_id
s.recv(1024) # Flush the buffer.
s.send(json.dumps(payload).encode("utf8") + b"\n")
if not block:
return
response = s.recv(4 * 1024)
for line in response.split(b"\n"):
if msg_id.encode("utf8") not in line:
continue
data = json.loads(line)
if data.get("id") != msg_id:
continue
if data["type"] == "unexpected_error":
raise ValueError("unexpected error occurred")
def register(self, voice=False):
"""
Register the given number.
voice: Whether to receive a voice call or an SMS for verification.
"""
payload = {"type": "register", "username": self.username, "voice": voice}
self._send_command(payload)
def verify(self, code: str):
"""
Verify the given number by entering the code you received.
code: The code Signal sent you.
"""
payload = {"type": "verify", "username": self.username, "code": code}
self._send_command(payload)
def receive_messages(self) -> Iterator[Message]:
"Keep returning received messages."
s = self._get_socket()
s.send(json.dumps({"type": "subscribe", "username": self.username}).encode("utf8") + b"\n")
for line in readlines(s):
try:
message = json.loads(line.decode())
except json.JSONDecodeError:
print("Invalid JSON")
if message.get("type") != "message":
continue
try:
body = message['data']['dataMessage']['body']
except:
continue
# If the message type isn't "message", or if it's a weird message whose
# purpose I don't know, return. I think the weird message is a typing
# notification.
# print("type: " + message.get("type"))
# print("Body: " + str(message["data"].get("dataMessage")))
# print('--------------------START------------------')
# print('--------------------END------------------')
# print("Exiting")
# continue
message = message["data"]
data_message = message.get("dataMessage", {})
yield Message(
username=message["username"],
source=message["source"],
text=data_message["body"],
source_device=message["sourceDevice"],
timestamp=data_message.get("timestamp"),
timestamp_iso=message["timestampISO"],
expiration_secs=data_message.get("expiresInSeconds"),
group_info=data_message.get("groupInfo", {}),
attachments=[
Attachment(
content_type=attachment["contentType"],
id=attachment["id"],
size=attachment["size"],
stored_filename=attachment["storedFilename"],
)
for attachment in data_message.get("attachments", [])
],
)
def send_message(self, recipient: str, text: str, block: bool = True) -> None:
"""
Send a message.
recipient: The recipient's phone number, in E.123 format.
text: The text of the message to send.
block: Whether to block while sending. If you choose not to block, you won't get an exception if there
are any errors.
"""
payload = {"type": "send", "username": self.username, "recipientAddress": recipient, "messageBody": text}
self._send_command(payload, block)
def send_group_message(self, recipient_group_id: str, text: str, block: bool = False) -> None:
"""
Send a group message.
recipient_group_id: The base64 encoded group ID to send to.
text: The text of the message to send.
block: Whether to block while sending. If you choose not to block, you won't get an exception if
there are any errors.
"""
payload = {
"type": "send",
"username": self.username,
"recipientGroupId": recipient_group_id,
"messageBody": text,
}
self._send_command(payload, block)
def chat_handler(self, regex, order=100):
"""
A decorator that registers a chat handler function with a regex.
"""
if not isinstance(regex, RE_TYPE):
regex = re.compile(regex, re.I)
def decorator(func):
self._chat_handlers.append((order, regex, func))
# Use only the first value to sort so that declaration order doesn't change.
self._chat_handlers.sort(key=lambda x: x[0])
return func
return decorator
def run_chat(self):
"""
Start the chat event loop.
"""
for message in self.receive_messages():
if not message.text:
continue
for _, regex, func in self._chat_handlers:
match = re.search(regex, message.text)
if not match:
continue
try:
reply = func(message, match)
except: # noqa - We don't care why this failed.
continue
if isinstance(reply, tuple):
stop, reply = reply
else:
stop = True
# In case a message came from a group chat
group_id = message.group_info.get("groupId")
if group_id:
self.send_group_message(recipient_group_id=group_id, text=reply)
else:
self.send_message(recipient=message.source, text=reply)
if stop:
# We don't want to continue matching things.
break

View file

@ -0,0 +1,23 @@
import attr
@attr.s
class Attachment:
content_type = attr.ib(type=str)
id = attr.ib(type=str)
size = attr.ib(type=int)
stored_filename = attr.ib(type=str)
@attr.s
class Message:
username = attr.ib(type=str)
source = attr.ib(type=str)
text = attr.ib(type=str)
source_device = attr.ib(type=int, default=0)
timestamp = attr.ib(type=int, default=None)
timestamp_iso = attr.ib(type=str, default=None)
expiration_secs = attr.ib(type=int, default=0)
attachments = attr.ib(type=list, default=[])
quote = attr.ib(type=str, default=None)
group_info = attr.ib(type=dict, default={})

View file

@ -1,16 +1,21 @@
#!/usr/bin/python3
# Makes a periodic requests to signal-cli-api and writes incoming messages to RabbitMQ
import json
import sys
import requests
import time
sys.path.append("./pysignald")
from pysignald import Signal
import json
import pika
import os
from threading import Thread
rabbitmq_host = os.environ.get('RABBITMQ_HOST')
SOCKET_PATH = os.environ.get('SOCKET_PATH')
connection = pika.BlockingConnection(pika.ConnectionParameters(rabbitmq_host))
@ -24,33 +29,6 @@ def get_registrations():
regs = json.load(regfile)
return regs
def receive_message(url, regid):
headers = {'Content-Type': 'application/json'}
url = url + "/v1/receive/" + regid
message_list = []
try:
r = requests.get(url, headers=headers)
messages = r.json()
for message in messages:
try:
text = message['envelope']['dataMessage']['message']
timestamp = message['envelope']['dataMessage']['timestamp']
source = message['envelope']['source']
message_list.append([text, timestamp, source])
except:
pass # This could be media message, or just some internal communication
except:
print("Unable to get messages from Signal API")
connection.sleep(30)
pass
if message_list:
return(message_list)
else:
return False
def save_message(message):
@ -58,7 +36,6 @@ def save_message(message):
channel = connection.channel()
channel.queue_declare(queue='signal-receive')
message_json = json.dumps(message)
channel.basic_publish(exchange='',
@ -68,18 +45,29 @@ def save_message(message):
connection.close()
def get_messages(regid):
s = Signal(regid, socket_path=SOCKET_PATH)
for message in s.receive_messages():
# For testing purposes, just echo message back
# Example message
# Message(username='+420777811038', source={'number': '+420606130958'}, text='Now it works!', source_device=1, timestamp=1601302777104, timestamp_iso='2020-09-28T14:19:37.104Z', expiration_secs=0, attachments=[], quote=None, group_info={})
msg = [
message.text,
message.timestamp,
message.source['number'],
message.username
]
save_message(msg)
def main():
#rabbitmq_connect()
regs = get_registrations()
if regs:
while True:
for reg in regs:
messages = receive_message(reg['url'], reg['id'])
if messages:
for message in messages:
message.append(reg['id'])
print(message)
save_message(message)
regid = reg["id"]
print("Starting thread for number " + regid)
Thread(target=get_messages,args=(regid,)).start()
else:
print("Missing registration list.")
connection.close()

View file

@ -1,2 +1,2 @@
pika
requests
attrs

View file

@ -1,6 +1,5 @@
[
{
"url": "http://signal-api:8080",
"id": "+1123456789"
}
]

View file

@ -1,2 +1,2 @@
pika
requests
attrs

View file

@ -4,10 +4,14 @@
import pika
import json
import requests
import os
import sys
sys.path.append("./pysignald")
from pysignald import Signal
rabbitmq_host = os.environ.get('RABBITMQ_HOST')
SOCKET_PATH = os.environ.get('SOCKET_PATH')
connection = pika.BlockingConnection(pika.ConnectionParameters(rabbitmq_host))
@ -25,20 +29,11 @@ def send_message(text, sender, destination):
for reg in regs:
if sender in reg['id']:
sender_id = reg['id']
sender_url = reg['url']
sender_found = True
if sender_found:
dst = []
dst.append(destination)
data = {
"message": text,
"number": sender_id,
"recipients": dst
}
url = sender_url + "/v2/send"
headers = {'Content-Type': 'application/json'}
r = requests.post(url, headers=headers, data=json.dumps(data))
s = Signal(sender_id, socket_path=SOCKET_PATH)
s.send_message(destination, text)
return True
else:

View file

@ -51,6 +51,7 @@ channel.basic_consume('signal-receive', callback, auto_ack=False)
try:
print("starting consuming")
channel.start_consuming()
except KeyboardInterrupt:
channel.stop_consuming()