Base Framework
testsuite/echod.cpp
/***************************************************************************
The Base Framework (Test Suite)
A framework for developing platform independent applications
See COPYRIGHT.txt for details.
This framework is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
For the licensing terms refer to the file 'LICENSE'.
***************************************************************************/
#include <base/Application.h>
#include <base/UnsignedInteger.h>
#include <base/collection/List.h>
#include <base/collection/Queue.h>
#include <base/concurrency/Semaphore.h>
#include <base/concurrency/Thread.h>
#include <base/net/InetEndPoint.h>
#include <base/net/InetService.h>
#include <base/net/ServerSocket.h>
#include <base/string/FormatOutputStream.h>
#include <base/io/EndOfFile.h>
using namespace com::azure::dev::base;
Semaphore semaphore;
class EchoThread : public Runnable {
private:
static const unsigned int BUFFER_SIZE = 4096;
public:
EchoThread()
: buffer(BUFFER_SIZE) {
}
void run() {
while (!thread->isTerminated()) {
semaphore.wait(); // wait for job
StreamSocket* job = jobs.pop();
if (!job) { // should we terminate
break;
}
StreamSocket socket = *job; // dereference for convenience
try {
bool ended = false;
while (!thread->isTerminated() && !ended) {
bool event = socket.wait(250000);
if (!event) {
continue;
}
while (true) {
unsigned int bytesToRead = buffer.getSize();
unsigned int bytesRead = socket.read(buffer.getElements(), bytesToRead, true);
if (bytesRead == 0) {
ended = true;
break;
}
socket.write(buffer.getElements(), bytesRead);
if (bytesRead < buffer.getSize()) {
break;
}
}
}
socket.close();
} catch (IOException&) {
}
}
}
virtual ~EchoThread() {
}
};
class ContextBinder : public Object {
private:
EchoThread runnable; // must be initialized before context
Thread context;
public:
ContextBinder()
: runnable(),
context(&runnable) {
}
void start() {
context.start();
}
~ContextBinder() {
context.terminate(); // ask context to terminate
context.join(); // wait for context to complete
}
};
class EchoServiceApplication : public Application {
private:
static const unsigned int MAJOR_VERSION = 1;
static const unsigned int MINOR_VERSION = 0;
static const unsigned short ECHO_SERVICE_PORT = 7;
static const unsigned int CONCURRENCY = 16;
unsigned short port = 0;
enum Command {
HELP,
ECHO_SERVICE
};
public:
EchoServiceApplication()
: Application("echod")
{
}
void help()
{
fout << getFormalName() << " version "
<< MAJOR_VERSION << '.' << MINOR_VERSION << EOL
<< "The Base Framework (Test Suite)" << EOL
<< ENDL;
fout << getFormalName() << " [--help] [--port PORT]" << ENDL;
}
bool accept(const InetEndPoint& endPoint) {
// fout << "Incoming connection: " << endPoint << ENDL;
return true;
}
void echod() {
for (unsigned int i = 0; i < CONCURRENCY; ++i) {
ContextBinder* temp = new ContextBinder();
threadPool.add(temp);
temp->start();
}
try {
InetAddress address;
ServerSocket serverSocket(address, port, 1);
while (!thread->isTerminated()) {
if (serverSocket.wait(250)) {
StreamSocket clientSocket = serverSocket.accept(100);
if (clientSocket) {
// TAG: add thread to pool if required?
InetEndPoint endPoint(
clientSocket.getAddress(),
clientSocket.getPort()
);
if (accept(endPoint)) {
jobs.push(new StreamSocket(clientSocket));
semaphore.post(); // notify one thread
} else {
clientSocket.shutdownOutputStream();
clientSocket.close();
}
}
}
}
} catch (IOException& e) {
ferr << "Error: " << e.getMessage() << ENDL;
setExitCode(EXIT_CODE_ERROR);
}
for (unsigned int i = 0; i < threadPool.getSize(); ++i) {
jobs.push(0); // queue dummy job to terminate context
semaphore.post(); // notify
}
while (enu.hasNext()) {
delete enu.next();
}
}
void main() {
String service;
Command command = ECHO_SERVICE;
const Array<String> arguments = getArguments();
if (arguments.getSize() > 0) {
while (enu.hasNext()) {
String argument = enu.next();
if (argument == "--help") {
command = HELP;
} else if (argument == "--port") {
String temp = enu.next();
try {
UnsignedInteger value(temp);
if (value > 0xffff) {
ferr << "Error: " << "Invalid port" << ENDL;
setExitCode(EXIT_CODE_ERROR);
return;
}
port = value;
} catch (InvalidFormat&) {
try {
InetService service(temp);
port = service.getPort();
} catch (ServiceNotFound& e) {
ferr << "Error: " << e.getMessage() << ENDL;
setExitCode(EXIT_CODE_ERROR);
return;
}
}
} else {
ferr << "Error: " << "Invalid argument" << ENDL;
setExitCode(EXIT_CODE_ERROR);
return;
}
}
}
switch (command) {
case HELP:
help();
break;
case ECHO_SERVICE:
echod();
break;
}
}
~EchoServiceApplication() {
}
};
APPLICATION_STUB(EchoServiceApplication);
Thread::start
void start()
ServiceNotFound
Internet Protocol service exception.
Definition: ServiceNotFound.h:28
InetEndPoint
Internet end point.
Definition: InetEndPoint.h:30
IOException
IO exception.
Definition: IOException.h:32
Socket::accept
bool accept(Socket &socket)
StreamSocket::getAddress
const InetAddress & getAddress() const noexcept
Definition: StreamSocket.h:155
Queue::push
void push(const TYPE &value)
Definition: Queue.h:240
Thread::join
bool join() const
StreamSocket::close
void close()
Definition: StreamSocket.h:116
Runnable
Base class of active objects.
Definition: Runnable.h:32
Allocator< uint8 >
Semaphore::wait
void wait() const
StreamSocket::write
unsigned int write(const uint8 *buffer, unsigned int size, bool nonblocking=false)
Definition: StreamSocket.h:380
Thread::getThread
static Thread * getThread() noexcept
Thread
Thread.
Definition: Thread.h:77
StreamSocket
Stream socket.
Definition: StreamSocket.h:33
StreamSocket::getPort
unsigned short getPort() const noexcept
Definition: StreamSocket.h:163
List::add
void add(const TYPE &value)
Definition: List.h:695
Thread::terminate
void terminate() noexcept
UnsignedInteger
Unsigned integer.
Definition: UnsignedInteger.h:31
String
String.
Definition: String.h:102
Thread::isTerminated
bool isTerminated() const noexcept
Definition: Thread.h:498
StreamSocket::shutdownOutputStream
void shutdownOutputStream()
Definition: StreamSocket.h:211
Allocator::getSize
MemorySize getSize() const noexcept
Definition: Allocator.h:533
Array::getSize
MemorySize getSize() const noexcept
Definition: Array.h:339
DoubleLinkedNodeEnumerator::next
Reference next()
Definition: DoubleLinkedNode.h:574
Object
Object.
Definition: Object.h:28
InvalidFormat
Invalid formation exception.
Definition: InvalidFormat.h:29
Allocator::getElements
TYPE * getElements() noexcept
Definition: Allocator.h:427
Application
Application.
Definition: Application.h:53
DoubleLinkedNodeEnumerator
Modifying enumerator of DoubleLinkedNode.
Definition: DoubleLinkedNode.h:533
ServerSocket
Server socket.
Definition: ServerSocket.h:30
List
List collection.
Definition: List.h:45
StreamSocket::wait
void wait() const
Definition: StreamSocket.h:391
Queue::pop
Value pop()
Definition: Queue.h:251
Semaphore
Semaphore synchronization object.
Definition: Semaphore.h:34
Array::getReadEnumerator
ReadEnumerator getReadEnumerator() const noexcept
Definition: Array.h:489
StreamSocket::read
unsigned int read(uint8 *buffer, unsigned int size, bool nonblocking=false)
Definition: StreamSocket.h:365
Queue
Queue.
Definition: Queue.h:36
Array< String >
InetService
Internet Protocol service.
Definition: InetService.h:31
DoubleLinkedNodeEnumerator::hasNext
bool hasNext() const noexcept
Definition: DoubleLinkedNode.h:566
Semaphore::post
void post()
InetAddress
Internet Protocol address.
Definition: InetAddress.h:38
Exception::getMessage
const char * getMessage() const noexcept
Definition: Exception.h:234
List::getEnumerator
Enumerator getEnumerator()
Definition: List.h:609
List::getSize
MemorySize getSize() const noexcept
Definition: List.h:553